Source code for iso8583.decoder

from typing import Tuple

__all__ = ["decode", "DecodeError"]


[docs]class DecodeError(ValueError): r"""Subclass of ValueError that describes ISO8583 decoding error. Attributes ---------- msg : str The unformatted error message s : bytes or bytearray The ISO8583 bytes instance being parsed doc_dec : dict Dict containing partially decoded ISO8583 data doc_enc : dict Dict containing partially encoded ISO8583 data pos : int The start index where ISO8583 bytes data failed parsing field : str The ISO8583 field where parsing failed """ def __init__( self, msg: str, s: bytes or bytearray, doc_dec: dict, doc_enc: dict, pos: int, field: str, ): errmsg = f"{msg}: field {field} pos {pos}" ValueError.__init__(self, errmsg) self.msg = msg self.s = s self.doc_dec = doc_dec self.doc_enc = doc_enc self.field = field self.pos = pos def __reduce__(self): return ( self.__class__, (self.msg, self.s, self.doc_dec, self.doc_enc, self.pos, self.field), )
[docs]def decode(s: bytes or bytearray, spec: dict) -> Tuple[dict, dict]: r"""Deserialize a bytes or bytearray instance containing ISO8583 data to a Python dict. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data Raises ------ DecodeError An error decoding ISO8583 bytearray TypeError `s` must be a bytes or bytearray instance Examples -------- >>> import pprint >>> import iso8583 >>> from iso8583.specs import default_ascii as spec >>> s = b"02004010100000000000161234567890123456123456111" >>> doc_dec, doc_enc = iso8583.decode(s, spec) >>> pprint.pprint(doc_dec) {'12': '123456', '2': '1234567890123456', '20': '111', 'bm': {2, 12, 20}, 'p': '4010100000000000', 't': '0200'} """ if not isinstance(s, (bytes, bytearray)): raise TypeError( f"the ISO8583 data must be bytes or bytearray, not {s.__class__.__name__}" ) doc_dec = {"bm": set()} doc_enc = {"bm": set()} idx = 0 idx = _decode_header(s, doc_dec, doc_enc, idx, spec) idx = _decode_type(s, doc_dec, doc_enc, idx, spec) idx = _decode_bitmaps(s, doc_dec, doc_enc, idx, spec) # Create the variable in case the bitmap set is empty # and there is extra data afterwards. # Set field to the last mandatory one: primary bitmap. f_id = "p" for f_id in [str(i) for i in sorted(doc_dec["bm"])]: # Secondary bitmap is already decoded in _decode_bitmaps if f_id == "1": continue idx = _decode_field(s, doc_dec, doc_enc, idx, f_id, spec) if idx != len(s): raise DecodeError( "Extra data after last field", s, doc_dec, doc_enc, idx, f_id ) from None return doc_dec, doc_enc
# # Private interface # def _decode_header( s: bytes or bytearray, doc_dec: dict, doc_enc: dict, idx: int, spec: dict ) -> int: r"""Decode ISO8583 header data if present. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of the header ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Header is not expected according to specifications if spec["h"]["max_len"] <= 0: return idx return _decode_field(s, doc_dec, doc_enc, idx, "h", spec) def _decode_type( s: bytes or bytearray, doc_dec: dict, doc_enc: dict, idx: int, spec: dict ) -> int: r"""Decode ISO8583 message type. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of message type ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Message type is a set length in ISO8583 if spec["t"]["data_enc"] == "b": f_len = 2 else: f_len = 4 doc_dec["t"] = "" doc_enc["t"] = {"len": b"", "data": bytes(s[idx : idx + f_len])} if len(s[idx : idx + f_len]) != f_len: raise DecodeError( f"Field data is {len(s[idx:idx + f_len])} bytes, expecting {f_len}", s, doc_dec, doc_enc, idx, "t", ) from None try: if spec["t"]["data_enc"] == "b": doc_dec["t"] = s[idx : idx + f_len].hex().upper() else: doc_dec["t"] = s[idx : idx + f_len].decode(spec["t"]["data_enc"]) except Exception as e: raise DecodeError( f"Failed to decode ({e})", s, doc_dec, doc_enc, idx, "t" ) from None return idx + f_len def _decode_bitmaps( s: bytes or bytearray, doc_dec: dict, doc_enc: dict, idx: int, spec: dict ) -> int: r"""Decode ISO8583 primary and secondary bitmaps. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of bitmaps ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ # Primary bitmap is a set length in ISO8583 if spec["p"]["data_enc"] == "b": f_len = 8 else: f_len = 16 doc_dec["p"] = "" doc_enc["p"] = {"len": b"", "data": bytes(s[idx : idx + f_len])} if len(s[idx : idx + f_len]) != f_len: raise DecodeError( f"Field data is {len(s[idx:idx + f_len])} bytes, expecting {f_len}", s, doc_dec, doc_enc, idx, "p", ) from None try: if spec["p"]["data_enc"] == "b": doc_dec["p"] = s[idx : idx + f_len].hex().upper() bm = s[idx : idx + f_len] else: doc_dec["p"] = s[idx : idx + f_len].decode(spec["p"]["data_enc"]) bm = bytes.fromhex(doc_dec["p"]) except Exception as e: raise DecodeError( f"Failed to decode ({e})", s, doc_dec, doc_enc, idx, "p" ) from None doc_dec["bm"] = set( [ byte_idx * 8 + bit for bit in range(1, 9) for byte_idx, byte in enumerate(bm) if byte >> (8 - bit) & 1 ] ) doc_enc["bm"] = doc_dec["bm"].copy() idx += f_len # No need to produce secondary bitmap if it's not required if 1 not in doc_dec["bm"]: return idx # Decode secondary bitmap # Secondary bitmap is a set length in ISO8583 if spec["1"]["data_enc"] == "b": f_len = 8 else: f_len = 16 doc_dec["1"] = "" doc_enc["1"] = {"len": b"", "data": bytes(s[idx : idx + f_len])} if len(s[idx : idx + f_len]) != f_len: raise DecodeError( f"Field data is {len(s[idx:idx + f_len])} bytes, expecting {f_len}", s, doc_dec, doc_enc, idx, "1", ) from None try: if spec["1"]["data_enc"] == "b": doc_dec["1"] = s[idx : idx + f_len].hex().upper() bm = s[idx : idx + f_len] else: doc_dec["1"] = s[idx : idx + f_len].decode(spec["1"]["data_enc"]) bm = bytes.fromhex(doc_dec["1"]) except Exception as e: raise DecodeError( f"Failed to decode ({e})", s, doc_dec, doc_enc, idx, "1" ) from None doc_dec["bm"].update( [ 64 + byte_idx * 8 + bit for bit in range(1, 9) for byte_idx, byte in enumerate(bm) if byte >> (8 - bit) & 1 ] ) doc_enc["bm"] = doc_dec["bm"].copy() return idx + f_len def _decode_field( s: bytes or bytearray, doc_dec: dict, doc_enc: dict, idx: int, f_id: str, spec: dict ) -> int: r"""Decode ISO8583 individual fields. Parameters ---------- s : bytes or bytearray Encoded ISO8583 data doc_dec : dict Dict containing decoded ISO8583 data doc_enc : dict Dict containing encoded ISO8583 data idx : int Current index in ISO8583 byte array f_id : str Field ID to be decoded spec : dict A Python dict defining ISO8583 specification. See :mod:`iso8583.specs` module for examples. Returns ------- int Index in ISO8583 byte array where parsing of the field ended Raises ------ DecodeError An error decoding ISO8583 bytearray. """ len_type = spec[f_id]["len_type"] doc_dec[f_id] = "" doc_enc[f_id] = {"len": bytes(s[idx : idx + len_type]), "data": b""} if len(s[idx : idx + len_type]) != len_type: raise DecodeError( f"Field length is {len(s[idx:idx + len_type])} bytes wide, expecting {len_type}", s, doc_dec, doc_enc, idx, f_id, ) from None # Parse field length if present. # For fixed-length fields max_len is the length. if len_type == 0: f_len = spec[f_id]["max_len"] else: try: if spec[f_id]["len_enc"] == "b": f_len = int(s[idx : idx + len_type].hex(), 10) else: f_len = int(s[idx : idx + len_type].decode(spec[f_id]["len_enc"]), 10) except Exception as e: raise DecodeError( f"Failed to decode length ({e})", s, doc_dec, doc_enc, idx, f_id ) from None if f_len > spec[f_id]["max_len"]: raise DecodeError( f"Field data is {f_len} bytes, larger than maximum {spec[f_id]['max_len']}", s, doc_dec, doc_enc, idx, f_id, ) from None idx += len_type # Do not parse zero-length field if f_len == 0: return idx # Parse field data doc_enc[f_id]["data"] = bytes(s[idx : idx + f_len]) if len(doc_enc[f_id]["data"]) != f_len: raise DecodeError( f"Field data is {len(doc_enc[f_id]['data'])} bytes, expecting {f_len}", s, doc_dec, doc_enc, idx, f_id, ) from None try: if spec[f_id]["data_enc"] == "b": doc_dec[f_id] = doc_enc[f_id]["data"].hex().upper() else: doc_dec[f_id] = doc_enc[f_id]["data"].decode(spec[f_id]["data_enc"]) except Exception as e: raise DecodeError( f"Failed to decode ({e})", s, doc_dec, doc_enc, idx, f_id ) from None return idx + f_len