Source code for iso8583.decoder

from typing import Any, Dict, Mapping, Set, Tuple, Type, Union

__all__ = ["decode", "DecodeError"]

DecodedDict = Dict[str, str]
EncodedDict = Dict[str, Dict[str, bytes]]
SpecDict = Mapping[str, Mapping[str, Any]]


[docs] class DecodeError(ValueError): r"""Subclass of ValueError that describes ISO8583 decoding error. Attributes ---------- msg : str The unformatted error message s : bytes or bytearray The ISO8583 bytes instance being parsed doc_dec : dict Dict containing partially decoded ISO8583 data doc_enc : dict Dict containing partially encoded ISO8583 data pos : int The start index where ISO8583 bytes data failed parsing field : str The ISO8583 field where parsing failed """ def __init__( self, msg: str, s: Union[bytes, bytearray], doc_dec: DecodedDict, doc_enc: EncodedDict, pos: int, field: str, ): errmsg = f"{msg}: field {field} pos {pos}" ValueError.__init__(self, errmsg) self.msg = msg self.s = s self.doc_dec = doc_dec self.doc_enc = doc_enc self.field = field self.pos = pos def __reduce__( self, ) -> Tuple[ Type["DecodeError"], Tuple[str, Union[bytes, bytearray], DecodedDict, EncodedDict, int, str], ]: return ( self.__class__, (self.msg, self.s, self.doc_dec, self.doc_enc, self.pos, self.field), )
[docs] def decode( s: Union[bytes, bytearray], spec: SpecDict ) -> Tuple[DecodedDict, EncodedDict]: r"""Deserialize a bytes or bytearray instance containing ISO8583 data to a Python dict. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data Raises ------ DecodeError An error decoding ISO8583 bytearray TypeError `s` must be a bytes or bytearray instance Examples -------- >>> import pprint >>> import iso8583 >>> from iso8583.specs import default_ascii as spec >>> s = b"02004010100000000000161234567890123456123456111" >>> doc_dec, doc_enc = iso8583.decode(s, spec) >>> pprint.pprint(doc_dec) {'12': '123456', '2': '1234567890123456', '20': '111', 'p': '4010100000000000', 't': '0200'} """ if not isinstance(s, (bytes, bytearray)): raise TypeError( f"Encoded ISO8583 data must be bytes or bytearray, not {s.__class__.__name__}" ) doc_dec: DecodedDict = {} doc_enc: EncodedDict = {} fields: Set[int] = set() idx = 0 idx = _decode_header(s, doc_dec, doc_enc, idx, spec) idx = _decode_type(s, doc_dec, doc_enc, idx, spec) idx = _decode_bitmap(s, doc_dec, doc_enc, idx, "p", spec, fields) # No need to produce secondary bitmap if it's not required if 1 in fields: idx = _decode_bitmap(s, doc_dec, doc_enc, idx, "1", spec, fields) # `field_key` can be used to throw an exception after the loop. # So, create it here in case the `fields` set is empty # and never enters the loop to create the variable. # Set `field_key` to the last mandatory one: primary bitmap. field_key = "p" for field_key in [str(i) for i in sorted(fields)]: # Secondary bitmap is already decoded if field_key == "1": continue idx = _decode_field(s, doc_dec, doc_enc, idx, field_key, spec[field_key]) if idx != len(s): raise DecodeError( "Extra data after last field", s, doc_dec, doc_enc, idx, field_key ) return doc_dec, doc_enc
# # Private interface # _FieldSpecDict = Mapping[str, Any] def _decode_header( s: Union[bytes, bytearray], doc_dec: DecodedDict, doc_enc: EncodedDict, idx: int, spec: SpecDict, ) -> int: r"""Decode ISO8583 header data if present. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of the header ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Header is not expected according to specifications if spec["h"]["max_len"] <= 0: return idx return _decode_field(s, doc_dec, doc_enc, idx, "h", spec["h"]) def _decode_type( s: Union[bytes, bytearray], doc_dec: DecodedDict, doc_enc: EncodedDict, idx: int, spec: SpecDict, ) -> int: r"""Decode ISO8583 message type. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of message type ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Message type is a set length in ISO8583 if spec["t"]["data_enc"] == "b": expected_field_len = 2 else: expected_field_len = 4 doc_dec["t"] = "" doc_enc["t"] = {"len": b"", "data": bytes(s[idx : idx + expected_field_len])} if len(s[idx : idx + expected_field_len]) != expected_field_len: raise DecodeError( f"Field data is {len(s[idx:idx + expected_field_len])} bytes, expecting {expected_field_len}", s, doc_dec, doc_enc, idx, "t", ) if spec["t"]["data_enc"] == "b": doc_dec["t"] = s[idx : idx + expected_field_len].hex().upper() else: _decode_text_data( s, s[idx : idx + expected_field_len], idx, doc_dec, doc_enc, "t", spec["t"], ) return idx + expected_field_len def _decode_bitmap( s: Union[bytes, bytearray], doc_dec: DecodedDict, doc_enc: EncodedDict, idx: int, field_key: str, spec: SpecDict, fields: Set[int], ) -> int: r"""Decode ISO8583 a bitmap. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. field_key : str Field ID to be decoded fields: set Will be populated with enabled field numbers Returns ------- int Index in ISO8583 byte array where parsing of bitmaps ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Primary/Secondary bitmap is a set length in ISO8583 if spec[field_key]["data_enc"] == "b": expected_field_len = 8 else: expected_field_len = 16 offset = 0 if field_key == "p" else 64 doc_dec[field_key] = "" doc_enc[field_key] = {"len": b"", "data": bytes(s[idx : idx + expected_field_len])} if len(s[idx : idx + expected_field_len]) != expected_field_len: raise DecodeError( f"Field data is {len(s[idx:idx + expected_field_len])} bytes, expecting {expected_field_len}", s, doc_dec, doc_enc, idx, field_key, ) if spec[field_key]["data_enc"] == "b": doc_dec[field_key] = s[idx : idx + expected_field_len].hex().upper() bitmap = s[idx : idx + expected_field_len] else: _decode_text_data( s, s[idx : idx + expected_field_len], idx, doc_dec, doc_enc, field_key, spec[field_key], ) try: bitmap = bytes.fromhex(doc_dec[field_key]) except Exception: raise DecodeError( "Failed to decode field, non-hex data", s, doc_dec, doc_enc, idx, field_key, ) from None fields.update( [ offset + byte_idx * 8 + bit for bit in range(1, 9) for byte_idx, byte in enumerate(bitmap) if byte >> (8 - bit) & 1 ] ) return idx + expected_field_len def _decode_field( s: Union[bytes, bytearray], doc_dec: DecodedDict, doc_enc: EncodedDict, idx: int, field_key: str, field_spec: _FieldSpecDict, ) -> int: r"""Decode ISO8583 individual fields. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array field_key : str Field ID to be decoded field_spec : dict A Python dict defining ISO8583 specification for this field. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of the field ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ len_type = field_spec["len_type"] # Optional field added in v2.1. Prior specs do not have it. len_count = field_spec.get("len_count", "bytes") doc_dec[field_key] = "" doc_enc[field_key] = {"len": bytes(s[idx : idx + len_type]), "data": b""} if len(s[idx : idx + len_type]) != len_type: raise DecodeError( f"Field length is {len(s[idx:idx + len_type])} bytes wide, expecting {len_type}", s, doc_dec, doc_enc, idx, field_key, ) # Parse field length if present. # For fixed-length fields max_len is the length. if len_type == 0: enc_field_len: int = field_spec["max_len"] # Variable field length else: # Binary length if field_spec["len_enc"] == "b": try: enc_field_len = int.from_bytes( s[idx : idx + len_type], "big", signed=False ) # It does not seem to be possible to hit this unless # it's a type or input parameter error. # However, keeping this, because you just never know. except Exception as e: # pragma: no cover raise DecodeError( f"Failed to decode field length, {e}", s, doc_dec, doc_enc, idx, field_key, ) from None # BCD length elif field_spec["len_enc"] == "bcd": try: enc_field_len = int(s[idx : idx + len_type].hex(), 10) except Exception: raise DecodeError( "Failed to decode field length, invalid BCD data", s, doc_dec, doc_enc, idx, field_key, ) from None # Text length else: try: decoded_length = s[idx : idx + len_type].decode(field_spec["len_enc"]) except LookupError: raise DecodeError( "Failed to decode field length, unknown encoding specified", s, doc_dec, doc_enc, idx, field_key, ) from None except Exception: raise DecodeError( "Failed to decode field length, invalid data", s, doc_dec, doc_enc, idx, field_key, ) from None try: enc_field_len = int(decoded_length) except Exception: raise DecodeError( "Failed to decode field length, non-numeric data", s, doc_dec, doc_enc, idx, field_key, ) from None if enc_field_len > field_spec["max_len"]: raise DecodeError( f"Field data is {enc_field_len} {len_count}, larger than maximum {field_spec['max_len']}", s, doc_dec, doc_enc, idx, field_key, ) idx += len_type # Do not parse zero-length field if enc_field_len == 0: return idx # Encoded field length can be in bytes or half bytes (nibbles) # Convert nibbles to bytes if needed if len_count == "nibbles": if enc_field_len & 1: byte_field_len = (enc_field_len + 1) // 2 else: byte_field_len = enc_field_len // 2 else: byte_field_len = enc_field_len # Parse field data doc_enc[field_key]["data"] = bytes(s[idx : idx + byte_field_len]) if len(doc_enc[field_key]["data"]) != byte_field_len: if len_count == "nibbles": actual_field_len = len(doc_enc[field_key]["data"]) * 2 else: actual_field_len = len(doc_enc[field_key]["data"]) raise DecodeError( f"Field data is {actual_field_len} {len_count}, expecting {enc_field_len}", s, doc_dec, doc_enc, idx, field_key, ) if field_spec["data_enc"] == "b": doc_dec[field_key] = doc_enc[field_key]["data"].hex().upper() if len_count == "nibbles" and enc_field_len & 1: doc_dec[field_key] = _remove_pad_field( s, idx, doc_dec, doc_enc, field_key, field_spec, enc_field_len, ) else: _decode_text_data( s, doc_enc[field_key]["data"], idx, doc_dec, doc_enc, field_key, field_spec, ) return idx + byte_field_len def _remove_pad_field( s: Union[bytes, bytearray], idx: int, doc_dec: DecodedDict, doc_enc: EncodedDict, field_key: str, field_spec: _FieldSpecDict, enc_field_len: int, ) -> str: r"""Remove left or right pad from a BCD or hex field. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data idx : int Current index in ISO8583 byte array doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data field_key : str Field ID to remove pad from field_spec : dict A Python dict defining ISO8583 specification for this field. See :mod:`iso8583.specs` module for examples. enc_field_len : int Number of nibbles expected in the field Returns ------- str Field data without pad Raises ------ DecodeError An error decoding ISO8583 bytearray. """ pad: str = field_spec.get("left_pad", "")[:1] if len(pad) > 0 and doc_dec[field_key][:1] == pad: return doc_dec[field_key][1:] pad = field_spec.get("right_pad", "")[:1] if len(pad) > 0 and doc_dec[field_key][-1:] == pad: return doc_dec[field_key][:-1] raise DecodeError( f"Field data is {len(doc_dec[field_key])} nibbles, expecting {enc_field_len}", s, doc_dec, doc_enc, idx, field_key, ) def _decode_text_data( s: Union[bytes, bytearray], data: Union[bytes, bytearray], idx: int, doc_dec: DecodedDict, doc_enc: EncodedDict, field_key: str, field_spec: _FieldSpecDict, ) -> None: r"""Decode non-binary field and handle errors if any Parameters ---------- s : bytes or bytearray Encoded ISO8583 data data : bytes or bytearray Encoded ISO8583 data to decode idx : int Current index in ISO8583 byte array doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data field_key : str Field ID to remove pad from field_spec : dict A Python dict defining ISO8583 specification for this field. See :mod:`iso8583.specs` module for examples. enc_field_len : int Number of nibbles expected in the field Raises ------ DecodeError An error decoding ISO8583 bytearray. """ try: doc_dec[field_key] = data.decode(field_spec["data_enc"]) except LookupError: raise DecodeError( "Failed to decode field, unknown encoding specified", s, doc_dec, doc_enc, idx, field_key, ) from None except Exception: raise DecodeError( "Failed to decode field, invalid data", s, doc_dec, doc_enc, idx, field_key, ) from None