Skip to content

Core API Reference

Auto-generated API documentation for the core module.

iso8583sim.core.types

Core data types, enumerations, and field definitions.

types

ISO8583Version

Bases: Enum

ISO 8583 protocol versions

FieldType

Bases: Enum

Field data types in ISO 8583

__str__

__str__() -> str

String representation returns the value

Source code in iso8583sim/core/types.py
28
29
30
def __str__(self) -> str:
    """String representation returns the value"""
    return self.value

CardNetwork

Bases: Enum

Card network identifiers

MessageClass

Bases: Enum

Message class identifiers (position 2 of MTI)

MessageFunction

Bases: Enum

Message function identifiers (position 3 of MTI)

MessageOrigin

Bases: Enum

Message origin identifiers (position 4 of MTI)

FieldDefinition dataclass

FieldDefinition(field_type: FieldType, max_length: int, description: str, field_number: int | None = None, encoding: str = 'ascii', min_length: int | None = None, padding_char: str | None = None, padding_direction: str = 'left')

Definition of an ISO 8583 field

__post_init__

__post_init__()

Validate field definition attributes after initialization

Source code in iso8583sim/core/types.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
def __post_init__(self):
    """Validate field definition attributes after initialization"""
    # Validate field type
    if not isinstance(self.field_type, FieldType):
        raise ValueError(f"Invalid field type: {self.field_type}")

    # Validate max_length
    if not isinstance(self.max_length, int) or self.max_length <= 0:
        raise ValueError(f"Invalid max length: {self.max_length}")

    # Validate min_length if provided
    if self.min_length is not None:
        if not isinstance(self.min_length, int) or self.min_length < 0:
            raise ValueError(f"Invalid min length: {self.min_length}")
        if self.min_length > self.max_length:
            raise ValueError("min_length cannot be greater than max_length")

    # Validate padding_direction
    if self.padding_direction not in ["left", "right"]:
        raise ValueError(f"Invalid padding direction: {self.padding_direction}")

    # Set min_length to max_length for fixed-length fields
    if self.min_length is None and self.field_type not in [FieldType.LLVAR, FieldType.LLLVAR]:
        self.min_length = self.max_length

    # Validate padding char length
    if self.padding_char is not None and len(self.padding_char) != 1:
        raise ValueError("padding_char must be a single character")

    # Validate encoding
    if not isinstance(self.encoding, str):
        raise ValueError("encoding must be a string")

ISO8583Message dataclass

ISO8583Message(mti: str, fields: dict[int, str], version: ISO8583Version = ISO8583Version.V1987, network: CardNetwork | None = None, raw_message: str | None = None, bitmap: str | None = None)

Represents a complete ISO 8583 message

message_class property

message_class: MessageClass

Get message class from MTI

message_function property

message_function: MessageFunction

Get message function from MTI

message_origin property

message_origin: MessageOrigin

Get message origin from MTI

__post_init__

__post_init__()

Initialize fields after creation

Source code in iso8583sim/core/types.py
138
139
140
141
142
143
144
145
def __post_init__(self):
    """Initialize fields after creation"""
    # Ensure fields dictionary exists
    if not self.fields:
        self.fields = {}
    # Add MTI as field 0
    if self.mti:
        self.fields[0] = self.mti

ISO8583Error

Bases: Exception

Base exception for ISO 8583 errors

ParseError

Bases: ISO8583Error

Raised when parsing fails

ValidationError

Bases: ISO8583Error

Raised when validation fails

BuildError

Bases: ISO8583Error

Raised when message building fails

get_field_definition cached

get_field_definition(field_number: int, network: CardNetwork | None = None, version: ISO8583Version = ISO8583Version.V1987) -> FieldDefinition | None

Get field definition considering network and version specifics.

Parameters:

Name Type Description Default
field_number int

The field number to look up

required
network CardNetwork | None

Optional card network for network-specific definitions

None
version ISO8583Version

ISO8583 version

V1987

Returns:

Type Description
FieldDefinition | None

FieldDefinition if found, None otherwise

Priority order: 1. Network-specific definition 2. Version-specific definition 3. Standard field definition

Note: Results are cached for performance (lru_cache with 512 entries).

Source code in iso8583sim/core/types.py
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
@lru_cache(maxsize=512)
def get_field_definition(
    field_number: int, network: CardNetwork | None = None, version: ISO8583Version = ISO8583Version.V1987
) -> FieldDefinition | None:
    """
    Get field definition considering network and version specifics.

    Args:
        field_number: The field number to look up
        network: Optional card network for network-specific definitions
        version: ISO8583 version

    Returns:
        FieldDefinition if found, None otherwise

    Priority order:
    1. Network-specific definition
    2. Version-specific definition
    3. Standard field definition

    Note: Results are cached for performance (lru_cache with 512 entries).
    """
    # Check network-specific fields first
    if network and network in NETWORK_SPECIFIC_FIELDS:
        if field_number in NETWORK_SPECIFIC_FIELDS[network]:
            return NETWORK_SPECIFIC_FIELDS[network][field_number]

    # Check version-specific variations
    if field_number in VERSION_SPECIFIC_FIELDS[version]:
        return VERSION_SPECIFIC_FIELDS[version][field_number]

    # Fall back to standard fields
    return ISO8583_FIELDS.get(field_number)

is_valid_mti

is_valid_mti(mti: str) -> bool

Check if MTI is valid.

Parameters:

Name Type Description Default
mti str

Message Type Indicator string

required

Returns:

Name Type Description
bool bool

True if MTI is valid, False otherwise

Validates: - Length is 4 digits - All characters are numeric - Each position contains valid values

Source code in iso8583sim/core/types.py
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
def is_valid_mti(mti: str) -> bool:
    """
    Check if MTI is valid.

    Args:
        mti: Message Type Indicator string

    Returns:
        bool: True if MTI is valid, False otherwise

    Validates:
    - Length is 4 digits
    - All characters are numeric
    - Each position contains valid values
    """
    if not mti or len(mti) != 4 or not mti.isdigit():
        return False

    # Validate version (position 1)
    version = mti[0]
    if version not in ["0", "1"]:
        return False

    # Validate message class (position 2)
    message_class = mti[1]
    if message_class not in [m.value for m in MessageClass]:
        return False

    # Validate message function (position 3)
    message_function = mti[2]
    if message_function not in [f.value for f in MessageFunction]:
        return False

    # Validate message origin (position 4)
    message_origin = mti[3]
    if message_origin not in [o.value for o in MessageOrigin]:
        return False

    return True

get_network_required_fields

get_network_required_fields(network: CardNetwork) -> list[int]

Get list of required fields for a specific network.

Parameters:

Name Type Description Default
network CardNetwork

Card network

required

Returns:

Type Description
list[int]

List of required field numbers

Source code in iso8583sim/core/types.py
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
def get_network_required_fields(network: CardNetwork) -> list[int]:
    """
    Get list of required fields for a specific network.

    Args:
        network: Card network

    Returns:
        List of required field numbers
    """
    return NETWORK_REQUIRED_FIELDS.get(network, [])

get_field_format_pattern

get_field_format_pattern(network: CardNetwork, field_number: int) -> str | None

Get network-specific field format pattern.

Parameters:

Name Type Description Default
network CardNetwork

Card network

required
field_number int

Field number

required

Returns:

Type Description
str | None

Regex pattern string if exists, None otherwise

Source code in iso8583sim/core/types.py
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
def get_field_format_pattern(network: CardNetwork, field_number: int) -> str | None:
    """
    Get network-specific field format pattern.

    Args:
        network: Card network
        field_number: Field number

    Returns:
        Regex pattern string if exists, None otherwise
    """
    network_formats = NETWORK_FIELD_FORMATS.get(network, {})
    return network_formats.get(field_number)

is_binary_field

is_binary_field(field_def: FieldDefinition) -> bool

Check if field is binary type.

Parameters:

Name Type Description Default
field_def FieldDefinition

Field definition

required

Returns:

Name Type Description
bool bool

True if field is binary, False otherwise

Source code in iso8583sim/core/types.py
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
def is_binary_field(field_def: FieldDefinition) -> bool:
    """
    Check if field is binary type.

    Args:
        field_def: Field definition

    Returns:
        bool: True if field is binary, False otherwise
    """
    return field_def.field_type == FieldType.BINARY

iso8583sim.core.parser

ISO 8583 message parsing.

parser

EMVTag dataclass

EMVTag(tag: str, length: int, value: str, raw: str)

EMV Tag data structure

ISO8583Parser

ISO8583Parser(version: ISO8583Version = ISO8583Version.V1987, pool: MessagePool | None = None)

Parser for ISO 8583 messages with network support

Initialize the parser.

Parameters:

Name Type Description Default
version ISO8583Version

ISO8583 version to use

V1987
pool MessagePool | None

Optional MessagePool for object reuse in high-throughput scenarios

None
Source code in iso8583sim/core/parser.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def __init__(self, version: ISO8583Version = ISO8583Version.V1987, pool: MessagePool | None = None):
    """
    Initialize the parser.

    Args:
        version: ISO8583 version to use
        pool: Optional MessagePool for object reuse in high-throughput scenarios
    """
    self.version = version
    self._pool = pool
    self._current_position = 0
    self._raw_message = ""
    self._detected_network = None
    self._secondary_bitmap = False
    self._network_fields = {}  # Cache for network-specific field definitions
    # Cache version-specific fields at init time (version doesn't change)
    self._version_fields = VERSION_SPECIFIC_FIELDS.get(version, {})
    self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
    self.logger.debug("Initialized ISO8583Parser with version %s", version.value)

parse

parse(message: str, network: CardNetwork | None = None) -> ISO8583Message

Parse an ISO 8583 message string into an ISO8583Message object

Source code in iso8583sim/core/parser.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def parse(self, message: str, network: CardNetwork | None = None) -> ISO8583Message:
    """Parse an ISO 8583 message string into an ISO8583Message object"""
    try:
        self._raw_message = message
        self._current_position = 0
        self._detected_network = network
        # Cache network-specific fields for faster lookup
        self._network_fields = NETWORK_SPECIFIC_FIELDS.get(network, {}) if network else {}

        # Parse MTI
        mti = self._parse_mti()
        self.logger.debug("Parsed MTI: %s", mti)

        # Parse bitmap
        bitmap = self._parse_bitmap()
        self.logger.debug("Parsed bitmap: %s", bitmap)
        present_fields = self._get_present_fields(bitmap)
        self.logger.debug("Present fields: %s", present_fields)

        # Auto-detect network if not provided
        if not network:
            self._detected_network = self._detect_network(message)
            # Update cached network fields after detection
            self._network_fields = (
                NETWORK_SPECIFIC_FIELDS.get(self._detected_network, {}) if self._detected_network else {}
            )

        self.logger.info(
            "Processing message for network: %s",
            self._detected_network.value if self._detected_network else "Unknown",
        )

        # Parse data fields
        fields = {0: mti}  # MTI is field 0
        for field_number in present_fields:
            try:
                field_def = get_field_definition(field_number, self._detected_network, self.version)

                if field_def is None:
                    self.logger.warning("No definition found for field %d", field_number)
                    continue

                value = self._parse_field(field_number, field_def)
                if value is not None:
                    fields[field_number] = self._format_field_value(field_number, value, field_def)
                    self.logger.debug("Parsed field %d: %s", field_number, fields[field_number])

            except Exception as e:
                self.logger.error("Error parsing field %d: %s", field_number, str(e))
                raise

        # Create message object (use pool if available for better performance)
        if self._pool is not None:
            msg = self._pool.acquire(
                mti=mti,
                fields=fields,
                version=self.version,
                network=self._detected_network,
                raw_message=message,
                bitmap=bitmap,
            )
        else:
            msg = ISO8583Message(
                mti=mti,
                fields=fields,
                version=self.version,
                network=self._detected_network,
                raw_message=message,
                bitmap=bitmap,
            )

        self.logger.info("Successfully parsed message")
        return msg

    except Exception as e:
        self.logger.error("Failed to parse message: %s", str(e))
        raise ParseError(f"Failed to parse message: {str(e)}") from e

parse_file

parse_file(filename: str) -> list[ISO8583Message]

Parse multiple messages from file

Source code in iso8583sim/core/parser.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def parse_file(self, filename: str) -> list[ISO8583Message]:
    """Parse multiple messages from file"""
    self.logger.info("Starting to parse messages from file: %s", filename)
    messages = []

    try:
        with open(filename) as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if not line:
                    continue

                try:
                    self.logger.debug("Parsing message from line %d", line_num)
                    message = self.parse(line)
                    messages.append(message)
                    self.logger.info("Successfully parsed message %d", line_num)
                except Exception as e:
                    self.logger.error("Failed to parse message at line %d: %s", line_num, str(e))
                    raise ParseError(f"Failed to parse message at line {line_num}: {str(e)}") from e

        self.logger.info("Successfully parsed %d messages from file", len(messages))
        return messages

    except Exception as e:
        self.logger.error("Error reading or parsing file: %s", str(e))
        raise ParseError(f"Failed to read or parse file: {str(e)}") from e

iso8583sim.core.builder

ISO 8583 message building.

builder

ISO8583Builder

ISO8583Builder(version: ISO8583Version = ISO8583Version.V1987)

Builder for creating ISO 8583 messages with network support

Source code in iso8583sim/core/builder.py
21
22
23
def __init__(self, version: ISO8583Version = ISO8583Version.V1987):
    self.version = version
    self.validator = ISO8583Validator()

build

build(message: ISO8583Message) -> str

Build raw ISO 8583 message string from message object

Source code in iso8583sim/core/builder.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def build(self, message: ISO8583Message) -> str:
    """Build raw ISO 8583 message string from message object"""
    try:
        # Pre-process fields
        processed_fields = {}
        for field_number, value in message.fields.items():
            if field_number == 0:  # Skip MTI
                continue

            field_def = get_field_definition(field_number, message.network, message.version)
            if not field_def:
                raise BuildError(f"Unknown field definition: {field_number}")

            # Format field value
            processed_value = self._format_field_value(field_number, value, field_def)
            processed_fields[field_number] = processed_value

        # Update message with processed fields
        message.fields.update(processed_fields)

        # Validate message
        errors = self.validator.validate_message(message)
        if errors:
            raise BuildError(f"Validation failed: {'; '.join(errors)}")

        # Build message components
        result = message.mti
        bitmap = self._build_bitmap(message.fields)
        result += bitmap

        # Build data fields in order
        present_fields = sorted(f for f in message.fields.keys() if f != 0)
        for field_number in present_fields:
            field_def = get_field_definition(field_number, message.network, message.version)
            if field_def:
                field_data = self._build_field(field_number, message.fields[field_number], field_def)
                result += field_data

        return result

    except BuildError:
        raise
    except Exception as e:
        raise BuildError(f"Failed to build message: {str(e)}") from e

create_message

create_message(mti: str, fields: dict[int, str]) -> ISO8583Message

Create an ISO8583Message object with validation

Source code in iso8583sim/core/builder.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def create_message(self, mti: str, fields: dict[int, str]) -> ISO8583Message:
    """Create an ISO8583Message object with validation"""
    message = ISO8583Message(mti=mti, fields=fields, version=self.version)

    # Validate
    errors = self.validator.validate_message(message)
    if errors:
        raise BuildError(f"Message validation failed: {'; '.join(errors)}")

    # Build raw message
    raw_message = self.build(message)
    message.raw_message = raw_message

    return message

create_response

create_response(request: ISO8583Message, response_fields: dict[int, str]) -> ISO8583Message

Create a response message based on a request message

Source code in iso8583sim/core/builder.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def create_response(self, request: ISO8583Message, response_fields: dict[int, str]) -> ISO8583Message:
    """Create a response message based on a request message"""
    # Create response MTI
    req_mti = request.mti
    resp_mti = list(req_mti)
    resp_mti[2] = MessageFunction.RESPONSE.value
    response_mti = "".join(resp_mti)

    # Copy necessary fields and ensure proper formatting
    response_fields = response_fields.copy()
    copy_fields = [2, 3, 4, 11, 37, 41, 42]
    for field in copy_fields:
        if field in request.fields:
            if field == 3:
                # Ensure processing code is properly formatted
                response_fields[field] = request.fields[field].zfill(6)
            elif field == 42:
                # Ensure merchant ID is properly formatted
                response_fields[field] = request.fields[field].ljust(15, " ")
            else:
                response_fields[field] = request.fields[field]

    return self.create_message(response_mti, response_fields)

create_reversal

create_reversal(original: ISO8583Message, additional_fields: dict[int, str] | None = None) -> ISO8583Message

Create a reversal message

Source code in iso8583sim/core/builder.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def create_reversal(
    self, original: ISO8583Message, additional_fields: dict[int, str] | None = None
) -> ISO8583Message:
    """Create a reversal message"""
    # Create reversal MTI
    orig_mti = original.mti
    rev_mti = f"04{orig_mti[2:]}"

    # Copy fields and ensure proper formatting
    reversal_fields = {}
    for field_num, value in original.fields.items():
        if field_num == 42:  # Merchant ID
            reversal_fields[field_num] = value.ljust(15, " ")
        elif field_num == 3:  # Processing Code
            reversal_fields[field_num] = value.zfill(6)
        else:
            reversal_fields[field_num] = value

    # Add reversal-specific fields
    now = datetime.now()
    reversal_fields.update(
        {
            7: now.strftime("%m%d%H%M%S"),  # Transmission date and time
            39: "00",  # Response code
            90: f"{orig_mti}{original.fields.get('11', '').zfill(6)}".ljust(42, "0"),  # Original elements
        }
    )

    # Add any additional fields
    if additional_fields:
        reversal_fields.update(additional_fields)

    return self.create_message(rev_mti, reversal_fields)

create_network_management_message

create_network_management_message(message_type: str, network: CardNetwork | None = None) -> ISO8583Message

Create a network management message

Source code in iso8583sim/core/builder.py
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def create_network_management_message(
    self, message_type: str, network: CardNetwork | None = None
) -> ISO8583Message:
    """Create a network management message"""
    fields = {
        70: message_type.zfill(3),  # Network management type
    }

    # Add network-specific fields
    if network == CardNetwork.VISA:
        fields.update(
            {
                53: "0000000000000000",  # Security Info
                96: "0123456789ABCDEF",  # Message Security Code (16 hex chars)
            }
        )
    elif network == CardNetwork.MASTERCARD:
        fields.update({48: "MC00".ljust(4), 53: "0000000000000000"})

    return self.create_message("0800", fields)

build_emv_data

build_emv_data(emv_tags: dict[str, str]) -> str

Build EMV data field (field 55) from tags

Source code in iso8583sim/core/builder.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
def build_emv_data(self, emv_tags: dict[str, str]) -> str:
    """Build EMV data field (field 55) from tags"""
    result = []
    for tag, value in sorted(emv_tags.items()):
        # Validate tag format
        if not all(c in "0123456789ABCDEF" for c in tag.upper()):
            raise BuildError(f"Invalid EMV tag format: {tag}")

        # Validate value format
        if not all(c in "0123456789ABCDEF" for c in value.upper()):
            raise BuildError(f"Invalid EMV value format for tag {tag}")

        # Calculate length
        length = len(value) // 2  # Convert hex length to bytes
        length_hex = hex(length)[2:].upper().zfill(2)

        result.append(tag + length_hex + value)

    return "".join(result)

iso8583sim.core.validator

Message validation with network-specific rules.

validator

ISO8583Validator

ISO8583Validator()

Enhanced validator for ISO 8583 messages with network support

Source code in iso8583sim/core/validator.py
28
29
30
31
32
33
34
35
36
def __init__(self):
    self.network_required_fields = {
        CardNetwork.VISA: [2, 3, 4, 11, 14, 22, 24, 25],
        CardNetwork.MASTERCARD: [2, 3, 4, 11, 22, 24, 25],
        CardNetwork.AMEX: [2, 3, 4, 11, 22, 25],
        CardNetwork.DISCOVER: [2, 3, 4, 11, 22],
        CardNetwork.JCB: [2, 3, 4, 11, 22, 25],
        CardNetwork.UNIONPAY: [2, 3, 4, 11, 22, 25, 49],
    }

validate_field

validate_field(field_number: int, value: str, field_def: FieldDefinition, network: CardNetwork | None = None) -> tuple[bool, str | None]

Validate field value

Source code in iso8583sim/core/validator.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def validate_field(
    self, field_number: int, value: str, field_def: FieldDefinition, network: CardNetwork | None = None
) -> tuple[bool, str | None]:
    """Validate field value"""
    try:
        # Length validation for fixed-length fields
        if field_def.field_type == FieldType.BINARY:
            # For binary fields, length is in bytes but value is in hex
            required_length = field_def.max_length * 2  # Convert bytes to hex chars
            if len(value) != required_length:
                return False, f"Field {field_number} length must be {field_def.max_length * 2} hex chars"
        elif field_def.field_type not in [FieldType.LLVAR, FieldType.LLLVAR]:
            if len(value) != field_def.max_length:
                return False, f"Field {field_number} length must be {field_def.max_length}"
        else:
            # Variable length field validation
            if len(value) > field_def.max_length:
                return False, f"Field {field_number} length cannot exceed {field_def.max_length}"
            if field_def.min_length and len(value) < field_def.min_length:
                return False, f"Field {field_number} length cannot be less than {field_def.min_length}"

        # Type-specific validation (use Cython if available)
        if field_def.field_type == FieldType.NUMERIC:
            is_valid = _is_numeric_fast(value) if _USE_CYTHON else value.isdigit()
            if not is_valid:
                return False, f"Field {field_number} must contain only digits"
        elif field_def.field_type == FieldType.BINARY:
            is_valid = (
                _is_valid_hex_fast(value) if _USE_CYTHON else all(c in "0123456789ABCDEFabcdef" for c in value)
            )
            if not is_valid:
                return False, f"Field {field_number} must be valid hexadecimal"
        elif field_def.field_type == FieldType.ALPHA:
            is_valid = _is_alpha_fast(value) if _USE_CYTHON else value.replace(" ", "").isalpha()
            if not is_valid:
                return False, f"Field {field_number} must contain only letters"
        elif field_def.field_type == FieldType.ALPHANUMERIC:
            is_valid = _is_alphanumeric_fast(value) if _USE_CYTHON else value.replace(" ", "").isalnum()
            if not is_valid:
                return False, f"Field {field_number} must contain only letters and numbers"

        # Field passed all validations
        return True, None

    except Exception as e:
        return False, f"Validation error for field {field_number}: {str(e)}"

validate_message

validate_message(message: ISO8583Message) -> list[str]

Validate complete ISO 8583 message

Source code in iso8583sim/core/validator.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def validate_message(self, message: ISO8583Message) -> list[str]:
    """Validate complete ISO 8583 message"""
    errors = []

    # Validate MTI
    mti_valid, mti_error = self.validate_mti(message.mti)
    if not mti_valid:
        errors.append(mti_error)

    # Validate bitmap if present
    if message.bitmap:
        bitmap_valid, bitmap_error = self.validate_bitmap(message.bitmap)
        if not bitmap_valid:
            errors.append(bitmap_error)

    # Validate fields
    for field_number, value in message.fields.items():
        if field_number == 0:  # MTI already validated
            continue

        # Get field definition considering network and version
        field_def = get_field_definition(field_number, message.network, message.version)

        if not field_def:
            errors.append(f"Unknown field number: {field_number}")
            continue

        valid, error = self.validate_field(field_number, value, field_def)
        if not valid:
            errors.append(error)

    # Network-specific validation
    if message.network:
        network_errors = self.validate_network_compliance(message)
        errors.extend(network_errors)

    return errors

validate_processing_code staticmethod

validate_processing_code(code: str) -> bool

Validate processing code format

Source code in iso8583sim/core/validator.py
188
189
190
191
192
193
194
195
196
197
198
@staticmethod
def validate_processing_code(code: str) -> bool:
    """Validate processing code format"""
    if not code.isdigit() or len(code) != 6:
        return False

    tt = int(code[0:2])  # Transaction Type
    aa = int(code[2:4])  # Account Type (From)
    ss = int(code[4:6])  # Account Type (To)

    return all(0 <= x <= 99 for x in (tt, aa, ss))

validate_mti classmethod

validate_mti(mti: str) -> tuple[bool, str | None]

Validate Message Type Indicator

Parameters:

Name Type Description Default
mti str

4-digit MTI string

required

Returns:

Type Description
tuple[bool, str | None]

(is_valid, error_message)

Source code in iso8583sim/core/validator.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@classmethod
def validate_mti(cls, mti: str) -> tuple[bool, str | None]:
    """
    Validate Message Type Indicator

    Args:
        mti: 4-digit MTI string

    Returns:
        (is_valid, error_message)
    """
    if not mti or len(mti) != 4:
        return False, "MTI must be 4 digits"

    if not mti.isdigit():
        return False, "MTI must contain only digits"

    version = mti[0]
    if version not in ["0", "1"]:
        return False, "MTI version must be 0 or 1"

    message_class = mti[1]
    if message_class not in ["1", "2", "3", "4", "5", "6", "8", "9"]:
        return False, "Invalid MTI message class"

    return True, None

validate_bitmap classmethod

validate_bitmap(bitmap: str) -> tuple[bool, str | None]

Validate bitmap format and content

Parameters:

Name Type Description Default
bitmap str

Hexadecimal bitmap string

required

Returns:

Type Description
tuple[bool, str | None]

(is_valid, error_message)

Source code in iso8583sim/core/validator.py
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@classmethod
def validate_bitmap(cls, bitmap: str) -> tuple[bool, str | None]:
    """
    Validate bitmap format and content

    Args:
        bitmap: Hexadecimal bitmap string

    Returns:
        (is_valid, error_message)
    """
    if not bitmap:
        return False, "Bitmap is required"

    if len(bitmap) not in [16, 32]:  # 16 bytes for primary, 32 for secondary
        return False, "Invalid bitmap length"

    try:
        # Validate hex format
        int(bitmap, 16)

        # Check if secondary bitmap is present (bit 1)
        has_secondary = len(bitmap) == 32 or (int(bitmap[0], 16) & 0x80)
        if has_secondary and len(bitmap) != 32:
            return False, "Secondary bitmap indicator set but bitmap not 32 bytes"

        return True, None
    except ValueError:
        return False, "Invalid bitmap format"

validate_pan classmethod

validate_pan(pan: str) -> bool

Validate Primary Account Number using Luhn algorithm

Parameters:

Name Type Description Default
pan str

Card number string

required

Returns:

Type Description
bool

True if valid, False otherwise

Source code in iso8583sim/core/validator.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
@classmethod
def validate_pan(cls, pan: str) -> bool:
    """
    Validate Primary Account Number using Luhn algorithm

    Args:
        pan: Card number string

    Returns:
        True if valid, False otherwise
    """
    # Use Cython-optimized version if available
    if _USE_CYTHON:
        return _validate_pan_luhn_fast(pan)

    # Pure Python fallback
    if not pan.isdigit():
        return False

    # Luhn algorithm
    digits = [int(d) for d in pan]
    checksum = 0
    odd_even = len(digits) % 2

    for i in range(len(digits) - 1, -1, -1):
        d = digits[i]
        if i % 2 == odd_even:
            d *= 2
            if d > 9:
                d -= 9
        checksum += d

    return (checksum % 10) == 0

validate_network_compliance

validate_network_compliance(message: ISO8583Message) -> list[str]

Validate network-specific requirements

Source code in iso8583sim/core/validator.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
def validate_network_compliance(self, message: ISO8583Message) -> list[str]:
    """Validate network-specific requirements"""
    errors = []

    if not message.network:
        return errors

    # Check required fields
    required_fields = self.network_required_fields.get(message.network, [])
    for field in required_fields:
        if field not in message.fields:
            errors.append(f"Required field {field} missing for {message.network.value}")

    # Network-specific validations
    if message.network == CardNetwork.VISA:
        if 44 in message.fields:
            if not self._validate_visa_field_44(message.fields[44]):
                errors.append("Invalid format for VISA field 44")

    elif message.network == CardNetwork.MASTERCARD:
        if 48 in message.fields:
            if not message.fields[48].startswith("MC"):
                errors.append("Mastercard field 48 must start with 'MC'")

    return errors

validate_emv_data

validate_emv_data(emv_data: str) -> list[str]

Validate EMV data format (TLV structure)

Source code in iso8583sim/core/validator.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
def validate_emv_data(self, emv_data: str) -> list[str]:
    """Validate EMV data format (TLV structure)"""
    if not emv_data:
        return ["Empty EMV data"]

    errors = []
    position = 0

    try:
        while position < len(emv_data):
            # Need minimum 4 chars (2 for 1-byte tag, 2 for length)
            if position + 4 > len(emv_data):
                errors.append("Incomplete EMV data")
                break

            # Read first byte of tag
            tag_byte1 = emv_data[position : position + 2]
            if not _HEX_2_PATTERN.match(tag_byte1):
                errors.append(f"Invalid tag format: {tag_byte1}")
                break
            position += 2

            # Check if this is a multi-byte tag (bits 1-5 all set = 1F, 5F, 9F, DF)
            first_byte = int(tag_byte1, 16)
            if (first_byte & 0x1F) == 0x1F:
                # Multi-byte tag - read second byte
                if position + 2 > len(emv_data):
                    errors.append(f"Incomplete multi-byte tag starting with {tag_byte1}")
                    break
                tag_byte2 = emv_data[position : position + 2]
                if not _HEX_2_PATTERN.match(tag_byte2):
                    errors.append(f"Invalid second byte of tag: {tag_byte2}")
                    break
                tag = tag_byte1 + tag_byte2
                position += 2
            else:
                tag = tag_byte1

            # Check length format (2 hex chars for 1-byte length)
            if position + 2 > len(emv_data):
                errors.append(f"Missing length for tag {tag}")
                break

            length_hex = emv_data[position : position + 2]
            try:
                length = int(length_hex, 16)
            except ValueError:
                errors.append(f"Invalid length format for tag {tag}")
                break
            position += 2

            # Check value format (length * 2 hex chars)
            value_length = length * 2  # Each byte is 2 hex chars
            if position + value_length > len(emv_data):
                errors.append(f"Incomplete value for tag {tag}")
                break

            value = emv_data[position : position + value_length]
            if len(value) != value_length or not _HEX_PATTERN.match(value):
                errors.append(f"Invalid value format for tag {tag}")
                break

            position += value_length

        return errors

    except Exception as e:
        return [f"EMV validation error: {str(e)}"]

validate_field_compatibility

validate_field_compatibility(field_number: int, value: str, version: ISO8583Version) -> list[str]

Validate field compatibility with ISO version

Parameters:

Name Type Description Default
field_number int

Field number to validate

required
value str

Field value

required
version ISO8583Version

ISO8583 version

required

Returns:

Type Description
list[str]

List of compatibility errors

Source code in iso8583sim/core/validator.py
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
def validate_field_compatibility(self, field_number: int, value: str, version: ISO8583Version) -> list[str]:
    """
    Validate field compatibility with ISO version

    Args:
        field_number: Field number to validate
        value: Field value
        version: ISO8583 version

    Returns:
        List of compatibility errors
    """
    errors = []

    # Get version-specific field definition
    field_def = get_field_definition(field_number, version=version)
    if not field_def:
        return [f"Field {field_number} not defined in ISO8583:{version.value}"]

    # Check length compatibility
    if len(value) > field_def.max_length:
        errors.append(
            f"Field {field_number} length {len(value)} exceeds "
            f"maximum {field_def.max_length} for version {version.value}"
        )

    # Check type compatibility
    if field_def.field_type == FieldType.BINARY and not _HEX_PATTERN.match(value):
        errors.append(f"Field {field_number} must be hexadecimal in version {version.value}")

    # Version-specific validations
    if version == ISO8583Version.V1987:
        if field_number == 43 and len(value) > 40:
            errors.append("Field 43 maximum length is 40 in ISO8583:1987")
    elif version == ISO8583Version.V1993:
        if field_number == 43 and len(value) > 99:
            errors.append("Field 43 maximum length is 99 in ISO8583:1993")
    elif version == ISO8583Version.V2003:
        if field_number == 43 and len(value) > 256:
            errors.append("Field 43 maximum length is 256 in ISO8583:2003")

    return errors

iso8583sim.core.emv

EMV/ICC chip card data handling.

emv

EMV (Europay, Mastercard, Visa) TLV data handling.

This module provides functions for parsing and building EMV Tag-Length-Value encoded data, commonly found in ISO 8583 Field 55.

parse_emv_data

parse_emv_data(data: str) -> dict[str, str]

Parse EMV TLV data into a dictionary of tags and values.

Parameters:

Name Type Description Default
data str

Hex-encoded EMV TLV data

required

Returns:

Type Description
dict[str, str]

Dictionary mapping tag strings to value strings (hex-encoded)

Raises:

Type Description
ValueError

If data is malformed

Source code in iso8583sim/core/emv.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def parse_emv_data(data: str) -> dict[str, str]:
    """Parse EMV TLV data into a dictionary of tags and values.

    Args:
        data: Hex-encoded EMV TLV data

    Returns:
        Dictionary mapping tag strings to value strings (hex-encoded)

    Raises:
        ValueError: If data is malformed
    """
    result: dict[str, str] = {}
    pos = 0
    data = data.upper()

    while pos < len(data):
        # Parse tag (1 or 2 bytes)
        if pos + 2 > len(data):
            break

        tag_byte1 = int(data[pos : pos + 2], 16)
        pos += 2

        # Check if tag is 2 bytes (if low 5 bits of first byte are all 1s)
        if (tag_byte1 & 0x1F) == 0x1F:
            if pos + 2 > len(data):
                break
            tag = f"{tag_byte1:02X}{data[pos : pos + 2]}"
            pos += 2
        else:
            tag = f"{tag_byte1:02X}"

        # Parse length (1, 2, or 3 bytes)
        if pos + 2 > len(data):
            break

        length_byte1 = int(data[pos : pos + 2], 16)
        pos += 2

        if length_byte1 <= 0x7F:
            # Short form: length is in this byte
            length = length_byte1
        elif length_byte1 == 0x81:
            # Long form: next byte is length
            if pos + 2 > len(data):
                break
            length = int(data[pos : pos + 2], 16)
            pos += 2
        elif length_byte1 == 0x82:
            # Long form: next 2 bytes are length
            if pos + 4 > len(data):
                break
            length = int(data[pos : pos + 4], 16)
            pos += 4
        else:
            # Invalid length encoding
            break

        # Parse value
        value_end = pos + (length * 2)
        if value_end > len(data):
            # Truncated value - take what we have
            value_end = len(data)

        value = data[pos:value_end]
        pos = value_end

        result[tag] = value

    return result

build_emv_data

build_emv_data(tags: dict[str, str]) -> str

Build EMV TLV data from a dictionary of tags and values.

Parameters:

Name Type Description Default
tags dict[str, str]

Dictionary mapping tag strings to value strings (hex-encoded)

required

Returns:

Type Description
str

Hex-encoded EMV TLV data string

Source code in iso8583sim/core/emv.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def build_emv_data(tags: dict[str, str]) -> str:
    """Build EMV TLV data from a dictionary of tags and values.

    Args:
        tags: Dictionary mapping tag strings to value strings (hex-encoded)

    Returns:
        Hex-encoded EMV TLV data string
    """
    result = []

    for tag, value in tags.items():
        # Normalize tag and value
        tag = tag.upper()
        value = value.upper()

        # Add tag
        result.append(tag)

        # Calculate length (in bytes)
        length = len(value) // 2

        # Encode length
        if length <= 0x7F:
            result.append(f"{length:02X}")
        elif length <= 0xFF:
            result.append(f"81{length:02X}")
        else:
            result.append(f"82{length:04X}")

        # Add value
        result.append(value)

    return "".join(result)

get_tag_name

get_tag_name(tag: str) -> str

Get the description of an EMV tag.

Parameters:

Name Type Description Default
tag str

Tag identifier (hex string)

required

Returns:

Type Description
str

Description of the tag, or "Unknown" if not found

Source code in iso8583sim/core/emv.py
233
234
235
236
237
238
239
240
241
242
def get_tag_name(tag: str) -> str:
    """Get the description of an EMV tag.

    Args:
        tag: Tag identifier (hex string)

    Returns:
        Description of the tag, or "Unknown" if not found
    """
    return EMV_TAGS.get(tag.upper(), "Unknown")

explain_tvr

explain_tvr(tvr_hex: str) -> list[str]

Explain Terminal Verification Results.

Parameters:

Name Type Description Default
tvr_hex str

5-byte TVR as hex string (10 characters)

required

Returns:

Type Description
list[str]

List of issues/flags that are set

Source code in iso8583sim/core/emv.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def explain_tvr(tvr_hex: str) -> list[str]:
    """Explain Terminal Verification Results.

    Args:
        tvr_hex: 5-byte TVR as hex string (10 characters)

    Returns:
        List of issues/flags that are set
    """
    issues = []

    if len(tvr_hex) < 10:
        tvr_hex = tvr_hex.ljust(10, "0")

    tvr_bytes = bytes.fromhex(tvr_hex)

    # Byte 1
    byte1_flags = [
        (0x80, "Offline data authentication not performed"),
        (0x40, "SDA failed"),
        (0x20, "ICC data missing"),
        (0x10, "Card appears on terminal exception file"),
        (0x08, "DDA failed"),
        (0x04, "CDA failed"),
    ]

    # Byte 2
    byte2_flags = [
        (0x80, "ICC and terminal have different application versions"),
        (0x40, "Expired application"),
        (0x20, "Application not yet effective"),
        (0x10, "Requested service not allowed for card product"),
        (0x08, "New card"),
    ]

    # Byte 3
    byte3_flags = [
        (0x80, "Cardholder verification was not successful"),
        (0x40, "Unrecognized CVM"),
        (0x20, "PIN Try Limit exceeded"),
        (0x10, "PIN entry required and PIN pad not present or not working"),
        (0x08, "PIN entry required, PIN pad present, but PIN was not entered"),
        (0x04, "Online PIN entered"),
    ]

    # Byte 4
    byte4_flags = [
        (0x80, "Transaction exceeds floor limit"),
        (0x40, "Lower consecutive offline limit exceeded"),
        (0x20, "Upper consecutive offline limit exceeded"),
        (0x10, "Transaction selected randomly for online processing"),
        (0x08, "Merchant forced transaction online"),
    ]

    # Byte 5
    byte5_flags = [
        (0x80, "Default TDOL used"),
        (0x40, "Issuer authentication failed"),
        (0x20, "Script processing failed before final GENERATE AC"),
        (0x10, "Script processing failed after final GENERATE AC"),
    ]

    all_flags = [byte1_flags, byte2_flags, byte3_flags, byte4_flags, byte5_flags]

    for i, byte_flags in enumerate(all_flags):
        if i < len(tvr_bytes):
            for mask, description in byte_flags:
                if tvr_bytes[i] & mask:
                    issues.append(description)

    return issues

explain_cid

explain_cid(cid_hex: str) -> str

Explain Cryptogram Information Data.

Parameters:

Name Type Description Default
cid_hex str

CID byte as hex string

required

Returns:

Type Description
str

Description of the cryptogram type

Source code in iso8583sim/core/emv.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def explain_cid(cid_hex: str) -> str:
    """Explain Cryptogram Information Data.

    Args:
        cid_hex: CID byte as hex string

    Returns:
        Description of the cryptogram type
    """
    cid_value = int(cid_hex, 16) if cid_hex else 0
    cryptogram_type = (cid_value >> 6) & 0x03

    types = {
        0: "AAC (Application Authentication Cryptogram) - Transaction declined",
        1: "TC (Transaction Certificate) - Transaction approved offline",
        2: "ARQC (Authorization Request Cryptogram) - Online authorization requested",
        3: "RFU (Reserved for Future Use)",
    }

    return types.get(cryptogram_type, "Unknown")

iso8583sim.core.pool

Object pooling for high-throughput scenarios.

pool

Object pooling for high-throughput ISO8583 message processing.

MessagePool

MessagePool(size: int = 100)

Thread-safe object pool for ISO8583Message instances.

Using a pool reduces object allocation overhead in high-throughput scenarios by reusing message objects instead of creating new ones.

Usage

pool = MessagePool(size=100)

Get a message from the pool

msg = pool.acquire("0100", {2: "4111111111111111"})

Process the message...

Return it to the pool when done

pool.release(msg)

Note: Messages returned to the pool have their fields cleared, so don't hold references to field data after releasing.

Initialize the message pool.

Parameters:

Name Type Description Default
size int

Maximum number of messages to keep in the pool

100
Source code in iso8583sim/core/pool.py
34
35
36
37
38
39
40
41
42
43
def __init__(self, size: int = 100):
    """
    Initialize the message pool.

    Args:
        size: Maximum number of messages to keep in the pool
    """
    self._pool: deque[ISO8583Message] = deque(maxlen=size)
    self._lock = Lock()
    self._max_size = size

size property

size: int

Get current number of messages in the pool.

max_size property

max_size: int

Get maximum pool size.

acquire

acquire(mti: str, fields: dict[int, str], version: ISO8583Version = ISO8583Version.V1987, network: CardNetwork | None = None, raw_message: str | None = None, bitmap: str | None = None) -> ISO8583Message

Get a message from the pool or create a new one.

Parameters:

Name Type Description Default
mti str

Message Type Indicator

required
fields dict[int, str]

Message fields dictionary

required
version ISO8583Version

ISO8583 version

V1987
network CardNetwork | None

Card network

None
raw_message str | None

Original raw message string

None
bitmap str | None

Message bitmap

None

Returns:

Type Description
ISO8583Message

An ISO8583Message instance (either recycled or new)

Source code in iso8583sim/core/pool.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def acquire(
    self,
    mti: str,
    fields: dict[int, str],
    version: ISO8583Version = ISO8583Version.V1987,
    network: CardNetwork | None = None,
    raw_message: str | None = None,
    bitmap: str | None = None,
) -> ISO8583Message:
    """
    Get a message from the pool or create a new one.

    Args:
        mti: Message Type Indicator
        fields: Message fields dictionary
        version: ISO8583 version
        network: Card network
        raw_message: Original raw message string
        bitmap: Message bitmap

    Returns:
        An ISO8583Message instance (either recycled or new)
    """
    msg = None

    with self._lock:
        if self._pool:
            msg = self._pool.pop()

    if msg is not None:
        # Reset the pooled message with new values
        msg.mti = mti
        msg.fields = fields.copy() if fields else {}
        msg.version = version
        msg.network = network
        msg.raw_message = raw_message
        msg.bitmap = bitmap
        # Re-run post_init logic
        if mti:
            msg.fields[0] = mti
        return msg

    # No pooled message available, create new one
    return ISO8583Message(
        mti=mti,
        fields=fields.copy() if fields else {},
        version=version,
        network=network,
        raw_message=raw_message,
        bitmap=bitmap,
    )

release

release(msg: ISO8583Message) -> None

Return a message to the pool for reuse.

Parameters:

Name Type Description Default
msg ISO8583Message

The message to return to the pool

required

Note: The message's fields are cleared to avoid holding references to old data.

Source code in iso8583sim/core/pool.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def release(self, msg: ISO8583Message) -> None:
    """
    Return a message to the pool for reuse.

    Args:
        msg: The message to return to the pool

    Note: The message's fields are cleared to avoid holding
    references to old data.
    """
    # Clear the message to free memory
    msg.fields.clear()
    msg.raw_message = None
    msg.bitmap = None

    with self._lock:
        if len(self._pool) < self._max_size:
            self._pool.append(msg)

clear

clear() -> None

Clear all messages from the pool.

Source code in iso8583sim/core/pool.py
116
117
118
119
def clear(self) -> None:
    """Clear all messages from the pool."""
    with self._lock:
        self._pool.clear()

get_default_pool

get_default_pool(size: int = 100) -> MessagePool

Get or create the default global message pool.

Parameters:

Name Type Description Default
size int

Pool size (only used on first call)

100

Returns:

Type Description
MessagePool

The default MessagePool instance

Source code in iso8583sim/core/pool.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_default_pool(size: int = 100) -> MessagePool:
    """
    Get or create the default global message pool.

    Args:
        size: Pool size (only used on first call)

    Returns:
        The default MessagePool instance
    """
    global _default_pool
    if _default_pool is None:
        _default_pool = MessagePool(size=size)
    return _default_pool

reset_default_pool

reset_default_pool() -> None

Reset the default global pool.

Source code in iso8583sim/core/pool.py
153
154
155
156
157
158
def reset_default_pool() -> None:
    """Reset the default global pool."""
    global _default_pool
    if _default_pool is not None:
        _default_pool.clear()
    _default_pool = None