# Copyright 2023 J.P. Morgan Chase & Co.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
# specific language governing permissions and limitations under the License.
"""
Round-trip serialization/deserialization of any Python object to/from any serialization format including Avro and JSON.
"""
import abc
import copy
import dataclasses
import datetime
import enum
import importlib
import importlib.metadata
import inspect
import io
import logging
import uuid
from collections.abc import Iterable, Iterator
from typing import (
Any,
BinaryIO,
Callable,
Dict,
List,
Optional,
Type,
TypeVar,
Union,
cast,
)
import avro.schema
import dateutil.parser
import memoization
import more_itertools
import orjson
import py_avro_schema as pas
import py_adapter._schema
import py_adapter.plugin
#: Library version, e.g. 1.0.0, taken from Git tags
__version__ = importlib.metadata.version("py-adapter")
logger = logging.getLogger(__package__)
# Elementary serializable data types
Primitives = Union[None, bool, str, int, float]
Logicals = Union[datetime.datetime, datetime.date]
Record = Dict[str, "Basic"]
Array = List["Basic"]
Basic = Union[Primitives, Logicals, Array, Record]
# TODO: support datetime as nanosecond integer
[docs]
def to_basic_type(obj: Any, *, datetime_type: Type = datetime.datetime, json_type: Type = str) -> Basic:
"""
Convert an object into a data structure using "basic" types only as a pre-serialization step.
:param obj: The object to convert
:param datetime_type: The type to convert datetime objects to. Supported types include :class:`int` (timestamp),
:class:`str` (ISO-format), and :class:`datetime.datetime` (no conversion).
:param json_type: The type to convert dataclass "JSON" dict fields to. Fields are identified by having custom
meta data ``{"py_adapter": {"logical_type": "json" }}``. If set to ``str`` (default), field
values are serialized as JSON strings. Set to ``dict`` for no conversion.
"""
data_dict = _DictAdapter(
datetime_type=datetime_type,
json_type=json_type,
).adapt(obj)
return data_dict
Obj = TypeVar("Obj")
[docs]
def from_basic_type(basic_obj: Basic, py_type: Type[Obj]) -> Obj:
"""
Convert a data structure with "basic" types only into a Python object of a given type
:param basic_obj: Any valid data structure that can be used to create an instance of ``py_type``
:param py_type: The Python class to create an instance from
"""
adapter = _ObjectAdapter.for_py_type(py_type)
obj = adapter.adapt(basic_obj)
return obj
[docs]
def serialize(obj: Any, *, format: str, writer_schema: bytes = b"") -> bytes:
"""
Serialize an object using a serialization format supported by **py-adapter**
:param obj: Python object to serialize
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
data_stream = io.BytesIO()
serialize_to_stream(obj, data_stream, format=format, writer_schema=writer_schema)
data_stream.seek(0)
data = data_stream.read()
return data
[docs]
def serialize_to_stream(obj: Any, stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None:
"""
Serialize an object to a file-like object using a serialization format supported by **py-adapter**
:param obj: Python object to serialize
:param stream: File like object to write the serialized data into
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize")
basic_obj = to_basic_type(obj)
py_type = type(obj)
serialize_fn(obj=basic_obj, stream=stream, py_type=py_type, writer_schema=writer_schema)
[docs]
def serialize_many(objs: Iterable[Any], *, format: str, writer_schema: bytes = b"") -> bytes:
"""
Serialize multiple objects using a serialization format supported by **py-adapter**
:param objs: Python objects to serialize
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
data_stream = io.BytesIO()
serialize_many_to_stream(objs, data_stream, format=format, writer_schema=writer_schema)
data_stream.seek(0)
data = data_stream.read()
return data
[docs]
def serialize_many_to_stream(objs: Iterable[Any], stream: BinaryIO, *, format: str, writer_schema: bytes = b"") -> None:
"""
Serialize multiple objects to a file-like object using a serialization format supported by **py-adapter**
:param objs: Python objects to serialize
:param stream: File like object to write the serialized data into
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema to serialize the data with, as JSON bytes.
"""
serialize_fn = py_adapter.plugin.plugin_hook(format, "serialize_many")
# Use the first object to find the class, assuming all objects share the same type
(first_obj,), objs = more_itertools.spy(objs) # This will fail if the iterable is empty
py_type = type(first_obj)
basic_objs = (to_basic_type(obj) for obj in objs)
serialize_fn(objs=basic_objs, stream=stream, py_type=py_type, writer_schema=writer_schema)
[docs]
def deserialize(
data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Obj:
"""
Deserialize bytes as a Python object of a given type from a serialization format supported by **py-adapter**
:param data: Serialized data
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
data_stream = io.BytesIO(data)
obj = deserialize_from_stream(
data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema
)
return obj
[docs]
def deserialize_from_stream(
stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Obj:
"""
Deserialize a file-like object as a Python object of a given type from a serialization format supported by
**py-adapter**
:param stream: File-like object to deserialize
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize")
basic_obj = deserialize_fn(stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema)
obj = from_basic_type(basic_obj, py_type)
return obj
[docs]
def deserialize_many(
data: bytes, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Iterator[Obj]:
"""
Deserialize bytes as an iterator over Python objects of a given type from a serialization format supported by
**py-adapter**
:param data: Serialized data
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
data_stream = io.BytesIO(data)
objs = deserialize_many_from_stream(
data_stream, py_type, format=format, writer_schema=writer_schema, reader_schema=reader_schema
)
return objs
[docs]
def deserialize_many_from_stream(
stream: BinaryIO, py_type: Type[Obj], *, format: str, writer_schema: bytes = b"", reader_schema: bytes = b""
) -> Iterator[Obj]:
"""
Deserialize a file-like object as an iterator over Python objects of a given type from a serialization format
supported by **py-adapter**
:param stream: File-like object to deserialize
:param py_type: The Python class to create an instance from
:param format: Serialization format as supported by a **py-adapter** plugin, e.g. ``JSON``.
:param writer_schema: Data schema used to serialize the data with, as JSON bytes.
:param reader_schema: Data schema to deserialize the data with, as JSON bytes. The reader schema should be
compatible with the writer schema.
"""
deserialize_fn = py_adapter.plugin.plugin_hook(format, "deserialize_many")
basic_objs = deserialize_fn(
stream=stream, py_type=py_type, writer_schema=writer_schema, reader_schema=reader_schema
)
objs = (from_basic_type(basic_obj, py_type) for basic_obj in basic_objs)
return objs
class _Adapter(abc.ABC):
"""Interface for an adapter"""
@abc.abstractmethod
def adapt(self, data: Any) -> Any:
"""
Adapt a data structure into something else
:param data: A valid data structure
"""
raise NotImplementedError()
class _DictAdapter(_Adapter):
"""An adapter to convert a Python object into a dict suitable for Avro serialization"""
#: Whether to include private object fields in the dict
incl_private_fields = False
#: Whether to include keys in dicts starting with an underscore
incl_private_keys = True
def __init__(self, datetime_type: Type = datetime.datetime, json_type: Type = str):
"""
:param datetime_type: The type to convert datetime objects to. Supported types include :class:`int` (timestamp),
:class:`str` (ISO-format), and :class:`datetime.datetime` (no conversion).
:param json_type: The type to convert dataclass "JSON" dict fields to. Fields are identified by having
custom meta data ``{"py_adapter": {"logical_type": "json" }}``. If set to ``str``
(default), field values are serialized as JSON strings. Set to ``dict`` for no conversion.
"""
self.datetime_type = datetime_type
self.json_type = json_type
def adapt(self, data: Any) -> Basic:
"""
Convert an object into a (nested) dictionary
Logic adapted from :mod:`dataclasses` with additional logic added to handle :class:`enum.Enum` instances and
JSON logical type fields inside data classes.
:param data: The object to convert
"""
if dataclasses.is_dataclass(data):
return self._adapt_dataclass(data)
elif isinstance(data, (list, tuple)):
return list(self.adapt(v) for v in data) # Additional logic: always use list
elif isinstance(data, dict):
# Modified: excluding private keys
return type(data)(
(self.adapt(k), self.adapt(v))
for k, v in data.items()
if not k.startswith("_") or self.incl_private_keys
)
elif isinstance(data, enum.Enum): # Additional logic
return data.value
elif isinstance(data, datetime.datetime):
return self._adapt_datetime(data) # Additional logic
elif isinstance(data, datetime.date):
return self._adapt_date(data) # Additional logic
elif isinstance(data, str):
return str(data) # Additional logic, it might be a string subclass
elif isinstance(data, uuid.UUID): # Additional logic
# TODO: introduce setting for UUID to str conversion, some serializer can work with UUID objects directly
return str(data)
else:
try:
# TODO: try dict(data) first
return self.adapt(
{k: v for k, v in vars(data).items() if not k.startswith("_") or self.incl_private_fields}
) # Additional logic
except TypeError:
return copy.deepcopy(data)
def _adapt_date(self, data: datetime.date) -> Union[Primitives, Logicals]:
"""Convert a date object"""
if self.datetime_type == int:
start_of_day = datetime.datetime.combine(data, datetime.time(), tzinfo=datetime.timezone.utc)
return int(start_of_day.timestamp() * 1e3) # Hardcode to timestamp in milliseconds for now
elif self.datetime_type == str:
return data.isoformat()
else:
return copy.deepcopy(data)
def _adapt_datetime(self, data: datetime.datetime) -> Union[Primitives, Logicals]:
"""Convert a datetime object"""
if self.datetime_type == int:
return int(data.timestamp() * 1e3) # Hardcode to timestamp in milliseconds for now
elif self.datetime_type == str:
return data.isoformat()
else:
return copy.deepcopy(data)
def _adapt_dataclass(self, data: Any) -> Record:
"""Recursively convert a dataclass object with all fields"""
result = []
for f in dataclasses.fields(data):
# Additional logic
value: Basic
field_meta = f.metadata.get(__package__) # Retrieve dataclass field meta data relevant to current pkg
if field_meta and field_meta.get("logical_type", "") == "json" and self.json_type == str:
# If this is a Python dict field with logical type JSON, encode data as JSON
value = orjson.dumps(getattr(data, f.name)).decode(encoding="utf-8")
else:
# Otherwise recursively adapt the field value as normal
value = self.adapt(getattr(data, f.name))
result.append((f.name, value))
return dict(result)
class _ObjectAdapter(_Adapter):
"""An adapter to convert a dict into a Python object using an Avro schema"""
#: Avro schema attribute to use for importing Python classes. Note that the ``namespace`` attribute is always used
#: as a fallback and in Python-only environments it is recommended to set the Avro namespace as the Python package/
#: module name.
module_schema_attribute: str = "pyModule"
#: Avro schema attribute for constructing Python objects (e.g. string subclasses) for Avro string primitive schemas.
named_string_attribute: str = "namedString"
def __init__(self, schema: avro.schema.Schema):
"""
An adapter to convert a dict into a Python object using an Avro schema
:param schema: The Avro schema to be used to adapt the data structure.
"""
self.schema = schema
@classmethod
def for_py_type(cls, py_type: Type) -> "_ObjectAdapter":
"""
An adapter to convert a dict into a Python object of a given class
:param py_type: The Python class to return an object adapter for
"""
try:
# TODO: expose options as necessary
schema = avro.schema.parse(
pas.generate(py_type, options=pas.Option.LOGICAL_JSON_STRING | pas.Option.MILLISECONDS).decode("utf-8")
)
except pas.TypeNotSupportedError:
raise TypeError(f"{py_type} not supported by py-adapter since it is not supported by py-avro-schema")
return cls(schema)
def adapt(self, data: Basic) -> Any:
"""
Parse a data structure and return a Python object
:param data: Any valid data structure that can be parsed using :attr:`schema`.
"""
return self._parse(data, self.schema)
def _parse(self, data: Basic, schema: avro.schema.Schema) -> Any:
"""Main parser method, called recursively"""
# TODO: improve type hints, second callable argument must be a schema object
parsers_by_schema: Dict[Type[avro.schema.Schema], Callable[[Any, Any], Any]] = {
avro.schema.ArraySchema: self._parse_array,
avro.schema.EnumSchema: self._parse_enum,
avro.schema.UnionSchema: self._parse_union,
avro.schema.RecordSchema: self._parse_record,
avro.schema.TimestampMillisSchema: self._parse_timestamp_millis,
avro.schema.DateSchema: self._parse_date,
avro.schema.PrimitiveSchema: self._parse_primitive,
avro.schema.UUIDSchema: self._parse_uuid,
}
parser = parsers_by_schema.get(type(schema))
if parser:
return parser(data, schema)
else:
return data
def _parse_primitive(self, data: Primitives, schema: avro.schema.PrimitiveSchema) -> Any:
"""
Parse primitive data types
Primitives would typically map 1-to-1 to Python types and Avro logical types would be handled by the Avro
serializer. However, we define here a custom string logical type "json" which maps to a Python dict type.
"""
if schema.type == "string" and isinstance(data, str):
if schema.get_prop("logicalType") == "json":
try:
return orjson.loads(data.encode(encoding="utf-8"))
except orjson.JSONDecodeError: # Handling say empty string as input which is not valid JSON
# The Python object should infill the default value, e.g. dict or list.
# TODO: what happens if the Python object does not have a default?
return None
elif schema.get_prop(self.named_string_attribute):
# We want this to fail if named_string class is not importable
dotted_name = cast(str, schema.get_prop(self.named_string_attribute))
class_ = self._import_attribute(dotted_name)
return class_(data) # Instantiate class, which must be a subclass of str
return data # Avro serializer handles the rest
def _parse_uuid(self, data: Union[str, uuid.UUID], schema: avro.schema.UUIDSchema) -> Union[None, uuid.UUID]:
"""
Parse a UUID string as a Python UUID object
"""
# TODO: introduce UUID to str conversion setting so we know whether the deserializer for a given format can
# handle UUID objects itself.
if isinstance(data, uuid.UUID):
return data
elif data:
return uuid.UUID(data)
else:
# Accept an empty string and return None such that the Python class can initialize a value. Any
# malformed strings would raise ValueErrors if they can't be cast as a UUID.
return None
def _parse_date(self, data: Union[datetime.date, int, str], schema: avro.schema.DateSchema) -> datetime.date:
"""
Parse the int logical type "date".
This is actually handled by the Avro deserializer itself, but we also want to support raw integers that were
not deserialized to a date, e.g. if we're deserializing from JSON. Handle that case directly here.
"""
if isinstance(data, int):
# Assume we have serialized as a timestamp in milliseconds. Now that is NOT how we would have serialized it
# if it was Avro. So this is useful only in combination with serializing using ``datetime_type=int``.
return datetime.datetime.fromtimestamp(data / 1e3, tz=datetime.timezone.utc).date()
elif isinstance(data, str):
return dateutil.parser.isoparse(data).date()
else:
# Deserialization to date object handled by Avro deserializer
return data
def _parse_timestamp_millis(
self, data: Union[datetime.datetime, int, str], schema: avro.schema.TimestampMillisSchema
) -> datetime.datetime:
"""
Parse the long logical type "millis".
This is actually handled by the Avro deserializer itself, but we also want to support raw integers that were
not deserialized to a datetime, e.g. if we're deserializing from JSON. Handle that case directly here.
"""
if isinstance(data, int):
# Assume we have serialized as a timestamp in milliseconds. This is useful only in combination with
# serializing using ``datetime_type=int``.
return datetime.datetime.fromtimestamp(data / 1e3, tz=datetime.timezone.utc)
elif isinstance(data, str):
return dateutil.parser.isoparse(data)
else:
# Deserialization to datetime object handled by Avro deserializer
return data
def _parse_array(self, data: Array, schema: avro.schema.ArraySchema) -> List[Any]:
"""Parse an array/list schema"""
data = [self._parse(elem, schema.items) for elem in data]
return data
def _parse_union(self, data: Basic, schema: avro.schema.UnionSchema) -> Any:
"""Parse a union schema trying the union branches one by one until the data fits"""
# Rank the branch schemas by best matching then by position in the union
scores_and_position = sorted(
(-py_adapter._schema.match(s, data), position) # Negative for best matched first
for position, s in enumerate(schema.schemas)
)
# Remove branch schemas that do not match at all
scores_and_position = [(s, p) for (s, p) in scores_and_position if s != 0]
for _, position in scores_and_position:
try:
# Recursively parse data using best matched schema
return self._parse(data, schema.schemas[position])
except (TypeError, KeyError):
# If record parsing fails, we try the next best schema
continue
else:
# If none of the schemas matched the data at all
raise DataTypeError(data, schema)
def _parse_enum(self, data: str, schema: avro.schema.EnumSchema) -> Any:
"""Parse an enum schema"""
enum_class = self._data_class(schema)
if enum_class:
return enum_class(data)
else: # TODO: remove this logic once we do proper writer vs reader schema resolution
return None
def _parse_record(self, data: Record, schema: avro.schema.RecordSchema) -> Any:
"""
Parse a record/object schema
This assumes objects satisfy the interface of :class:`dataclasses.dataclass`.
"""
data_class = self._data_class(schema)
if not data_class: # TODO: remove this logic once we do proper writer vs reader schema resolution
return None
obj_kwargs = {
field.name: self._parse(data[field.name], field.type) for field in schema.fields if field.name in data
}
factories = [
self._obj_using_init, # For proper dataclasses, this should work always
self._obj_set_attrs, # If a consumer has old dataclass code, there may be additional fields in data
self._obj_init_first, # Slow, use as last resort
]
obj = error = None
for factory in factories:
try:
obj = factory(data_class, obj_kwargs)
break # We're done, we've got an object!
except (TypeError, KeyError) as e: # Are there other errors we would need to catch?
error = e # Factory didn't work, move on to the next one
if not obj and error:
raise error from None # None of the factories worked, raise the last error
return obj
@staticmethod
def _obj_using_init(data_class: Type, kwargs: Dict[str, Basic]) -> Any:
"""Create an object by passing all fields into the constructor method"""
obj = data_class(**kwargs)
return obj
def _obj_set_attrs(self, data_class: Type, kwargs: Dict[str, Basic]) -> Any:
"""Create an object by setting all fields directly after object instantiation"""
obj = data_class()
self._set_obj_fields(obj, kwargs)
return obj
def _obj_init_first(self, data_class: Type, kwargs: Dict[str, Basic]) -> Any:
"""
Create an object by inspecting the constructor signature and passing all available parameters and setting other
fields afterwards
"""
# First inspect the data_class.__init__, find keyword only params vs the rest
pos_args_names, kw_arg_names = _constructor_params(data_class)
# Any other params that are in the payload but not in data_class.__init__
remaining_arg_names = [name for name in kwargs if name not in pos_args_names and name not in kw_arg_names]
# Take the corresponding parameter values from the payload
pos_args = [kwargs[name] for name in pos_args_names if name in kwargs]
kw_args = {name: kwargs[name] for name in kw_arg_names if name in kwargs}
remaining_args = {name: kwargs[name] for name in remaining_arg_names}
# Construct the object using data_class.__init__
obj = data_class(*pos_args, **kw_args)
# If possible set the remaining parameters directly as instance attributes
self._set_obj_fields(obj, remaining_args)
return obj
@staticmethod
def _set_obj_fields(obj, kwargs) -> None:
"""Set fields on an object if the object has the field"""
for field, value in kwargs.items():
if hasattr(obj, field): # Set fields that exist only
setattr(obj, field, value)
def _data_class(self, schema: Union[avro.schema.RecordSchema, avro.schema.EnumSchema]) -> Optional[Type]:
"""
Return the corresponding Python class for a schema
This is relevant only for record and enum schemas. Python object is imported from a package taken from the
schema's namespace or, optionally, from an schema attribute like ``pyModule``.
"""
module_name = cast(str, schema.props.get(self.module_schema_attribute, schema.namespace))
try:
return getattr(importlib.import_module(module_name), schema.name)
except AttributeError: # TODO: remove this logic once we do proper writer vs reader schema resolution
logger.warning("Failed to import class '%s.%s'", module_name, schema.name)
return None
@staticmethod
def _import_attribute(dotted_name: str) -> Any:
"""Import and return attribute from a module, e.g. a class"""
module_name, class_name = dotted_name.rsplit(".", 1)
module = importlib.import_module(module_name)
return getattr(module, class_name)
@memoization.cached(max_size=100)
def _constructor_params(data_class):
"""Inspect __init__'s signature for a class and return (positional only params, other params)"""
signature_params = inspect.signature(data_class.__init__).parameters
pos_args_names = [
name for name, param in signature_params.items() if param.kind == inspect.Parameter.POSITIONAL_ONLY
]
kw_arg_names = [name for name, param in signature_params.items() if name not in pos_args_names]
kw_arg_names.pop(0) # Remove `self`, assuming self is not a positional only param!
return pos_args_names, kw_arg_names
[docs]
class DataTypeError(TypeError):
"""Data not compatible with the schema error"""
def __init__(self, data: Any, schema: avro.schema.Schema) -> None:
"""Data not compatible with the schema error"""
old_debug_validate = py_adapter._schema._DEBUG_VALIDATE
py_adapter._schema._DEBUG_VALIDATE = True # Patch avro package
try:
logger.error("Data '%s' is not compatible with schema %s. Validation output:", data, schema)
py_adapter._schema.match(schema, data) # This will print validation info to std out, not using logger
super().__init__("Data not compatible with schema.")
finally:
py_adapter._schema._DEBUG_VALIDATE = old_debug_validate # Unpatch