More PEP8 standardisation

This commit is contained in:
László Károlyi 2018-04-22 16:27:27 +02:00
parent 9b912894ae
commit 3da56fd004
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE
23 changed files with 499 additions and 462 deletions

View file

@ -1 +0,0 @@
from ecrterm import common, conv, crc, ecr

View file

@ -189,91 +189,90 @@ TERMINAL_STATUS_CODES = {
DEBUG_PACKET_NAME = {
[0x0F, None]: 'RFU for proprietary applications, the utilisation for '
(0x0F, None): 'RFU for proprietary applications, the utilisation for '
'particular cases should be clarified between manufacturers',
[0x01, 0x01]: 'RFU',
[0x04, 0x01]: 'Set Date and Time in ECR',
[0x04, 0x0E]: 'Menu-Request',
[0x04, 0x0F]: 'Status-Information',
[0x04, 0xFF]: 'Intermediate-Statusinformation',
[0x05, 0x01]: 'Status-Enquiry',
[0x05, 0xFF]: 'RFU',
[0x06, 0x00]: 'Registration',
[0x06, 0x01]: 'Authorisation',
[0x06, 0x02]: 'Log-Off',
[0x06, 0x03]: 'Account Balance Request',
[0x06, 0x09]: 'Prepaid Top-Up',
[0x06, 0x0A]: 'Tax Free',
[0x06, 0x0B]: 'RFU',
[0x06, 0x0C]: 'TIP',
[0x06, 0x0F]: 'Completion',
[0x06, 0x10]: 'Send Turnover Totals',
[0x06, 0x11]: 'RFU',
[0x06, 0x12]: 'Print Turnover Receipts',
[0x06, 0x18]: 'Reset Terminal',
[0x06, 0x1A]: 'Print System Configuration',
[0x06, 0x1B]: 'Set/Reset Terminal-ID',
[0x06, 0x1E]: 'Abort',
[0x06, 0x20]: 'Repeat Receipt',
[0x06, 0x21]: 'Telephonic Authorisation',
[0x06, 0x22]: 'Pre-Authorisation/Reservation',
[0x06, 0x23]:
(0x01, 0x01): 'RFU',
(0x04, 0x01): 'Set Date and Time in ECR',
(0x04, 0x0E): 'Menu-Request',
(0x04, 0x0F): 'Status-Information',
(0x04, 0xFF): 'Intermediate-Statusinformation',
(0x05, 0x01): 'Status-Enquiry',
(0x05, 0xFF): 'RFU',
(0x06, 0x00): 'Registration',
(0x06, 0x01): 'Authorisation',
(0x06, 0x02): 'Log-Off',
(0x06, 0x03): 'Account Balance Request',
(0x06, 0x09): 'Prepaid Top-Up',
(0x06, 0x0A): 'Tax Free',
(0x06, 0x0B): 'RFU',
(0x06, 0x0C): 'TIP',
(0x06, 0x0F): 'Completion',
(0x06, 0x10): 'Send Turnover Totals',
(0x06, 0x11): 'RFU',
(0x06, 0x12): 'Print Turnover Receipts',
(0x06, 0x18): 'Reset Terminal',
(0x06, 0x1A): 'Print System Configuration',
(0x06, 0x1B): 'Set/Reset Terminal-ID',
(0x06, 0x1E): 'Abort',
(0x06, 0x20): 'Repeat Receipt',
(0x06, 0x21): 'Telephonic Authorisation',
(0x06, 0x22): 'Pre-Authorisation/Reservation',
(0x06, 0x23):
'Partial-Reversal of a Pre-Authorisation/Booking of a Reservation',
[0x06, 0x24]: 'Book Total',
[0x06, 0x25]: 'Pre-Authorisation Reversal',
[0x06, 0x30]: 'Reversal',
[0x06, 0x31]: 'Refund',
[0x06, 0x50]: 'End-of-Day',
[0x06, 0x51]: 'Send offline Transactions',
[0x06, 0x70]: 'Diagnosis',
[0x06, 0x79]: 'Selftest',
[0x06, 0x82]: 'RFU',
[0x06, 0x85]: 'Display Text (only included for downwards-compatibility, '
(0x06, 0x24): 'Book Total',
(0x06, 0x25): 'Pre-Authorisation Reversal',
(0x06, 0x30): 'Reversal',
(0x06, 0x31): 'Refund',
(0x06, 0x50): 'End-of-Day',
(0x06, 0x51): 'Send offline Transactions',
(0x06, 0x70): 'Diagnosis',
(0x06, 0x79): 'Selftest',
(0x06, 0x82): 'RFU',
(0x06, 0x85): 'Display Text (only included for downwards-compatibility, '
'for new implementations use 06 E0)',
[0x06, 0x86]: 'Display Text with Numerical Input (only included for '
(0x06, 0x86): 'Display Text with Numerical Input (only included for '
'downwards-compatibility, for new implementations use 06 E2)',
[0x06, 0x87]: 'PIN-Verification for Customer-Card (only included for '
(0x06, 0x87): 'PIN-Verification for Customer-Card (only included for '
'downwards-compatibility, for new implementations use 06 E3)',
[0x06, 0x88]: 'Display Text with Function-Key Input (only included for '
(0x06, 0x88): 'Display Text with Function-Key Input (only included for '
'downwards-compatibility, for new implementations use 06 E1)',
[0x06, 0x90]: 'RFU',
[0x06, 0x91]: 'Set Date and Time in PT',
[0x06, 0x93]: 'Initialisation',
[0x06, 0x95]: 'Change Password',
[0x06, 0xB0]: 'Abort',
[0x06, 0xC0]: 'Read Card',
[0x06, 0xCE]: 'RFU',
[0x06, 0xD1]: 'Print Line',
[0x06, 0xD3]: 'Print Text-Block',
[0x06, 0xD4]: 'RFU',
[0x06, 0xD8]: 'Dial-Up',
[0x06, 0xD9]: 'Transmit Data via Dial-Up',
[0x06, 0xDA]: 'Receive Data via Dial-Up ',
[0x06, 0xDB]: 'Hang-Up',
[0x06, 0xDD]: 'Transparent-Mode',
[0x06, 0xE0]: 'Display Text',
[0x06, 0xE1]: 'Display Text with Function-Key Input',
[0x06, 0xE2]: 'Display Text with Numerical Input',
[0x06, 0xE3]: 'PIN-Verification for Customer-Card',
[0x06, 0xE4]: 'Blocked-List Query to ECR',
[0x08, 0x01]: 'Activate Service-Mode',
[0x08, 0x02]: 'Switch Protocol',
[0x08, 0x10]: 'Software-Update',
[0x08, 0x11]: 'Read File',
[0x08, 0x12]: 'Delete File',
[0x08, 0x20]: 'Start OPT Action',
[0x08, 0x21]: 'Set OPT Point-in-Time',
[0x08, 0x22]: 'OPT-Pre-Initialisation',
[0x08, 0x23]: 'Output OPT-Data',
[0x08, 0x24]: 'OPT Out-of-Order',
[0x08, 0x30]: 'Select Language',
[0x08, 0x40]: 'Change Baudrate',
[0x08, 0x50]: 'Activate Card-Reader',
[0x0F, None]: 'reserved for proprietary extensions ',
[0x80, 0x00]: 'Positive Acknowledgement',
[0x84, 0x00]: 'Positive Acknowledgement',
[0x84, None]: 'Negative Acknowledgement',
[0x84, 0x9C]: 'Repeat Statusinfo'
(0x06, 0x90): 'RFU',
(0x06, 0x91): 'Set Date and Time in PT',
(0x06, 0x93): 'Initialisation',
(0x06, 0x95): 'Change Password',
(0x06, 0xB0): 'Abort',
(0x06, 0xC0): 'Read Card',
(0x06, 0xCE): 'RFU',
(0x06, 0xD1): 'Print Line',
(0x06, 0xD3): 'Print Text-Block',
(0x06, 0xD4): 'RFU',
(0x06, 0xD8): 'Dial-Up',
(0x06, 0xD9): 'Transmit Data via Dial-Up',
(0x06, 0xDA): 'Receive Data via Dial-Up ',
(0x06, 0xDB): 'Hang-Up',
(0x06, 0xDD): 'Transparent-Mode',
(0x06, 0xE0): 'Display Text',
(0x06, 0xE1): 'Display Text with Function-Key Input',
(0x06, 0xE2): 'Display Text with Numerical Input',
(0x06, 0xE3): 'PIN-Verification for Customer-Card',
(0x06, 0xE4): 'Blocked-List Query to ECR',
(0x08, 0x01): 'Activate Service-Mode',
(0x08, 0x02): 'Switch Protocol',
(0x08, 0x10): 'Software-Update',
(0x08, 0x11): 'Read File',
(0x08, 0x12): 'Delete File',
(0x08, 0x20): 'Start OPT Action',
(0x08, 0x21): 'Set OPT Point-in-Time',
(0x08, 0x22): 'OPT-Pre-Initialisation',
(0x08, 0x23): 'Output OPT-Data',
(0x08, 0x24): 'OPT Out-of-Order',
(0x08, 0x30): 'Select Language',
(0x08, 0x40): 'Change Baudrate',
(0x08, 0x50): 'Activate Card-Reader',
(0x80, 0x00): 'Positive Acknowledgement',
(0x84, 0x00): 'Positive Acknowledgement',
(0x84, None): 'Negative Acknowledgement',
(0x84, 0x9C): 'Repeat Statusinfo'
}
@ -318,19 +317,3 @@ class Transport(Logling):
def send(self, message, *args, **kwargs):
"""Send data."""
class ZVTException(Exception):
"""Base exception for ZVT errors."""
class TransportLayerException(ZVTException):
pass
class TransportTimeoutException(TransportLayerException):
pass
class ApplicationLayerException(ZVTException):
pass

View file

@ -1,4 +1,5 @@
"""smartcard.util package
"""
smartcard.util package
__author__ = "http://www.gemalto.com"
@ -113,43 +114,43 @@ def toBytes(bytestring):
"""GSM3.38 character conversion table."""
__dic_GSM_3_38__ = {
'@': 0x00, # @ At symbol
'£': 0x01, # £ Britain pound symbol
chr(0xa3): 0x01, # £ Britain pound symbol
'$': 0x02, # $ Dollar symbol
chr(0xA5): 0x03, # ¥ Yen symbol
'è': 0x04, # è e accent grave
'é': 0x05, # é e accent aigu
'ù': 0x06, # ù u accent grave
chr(0xEC): 0x07, # ì i accent grave
chr(0xF2): 0x08, # ò o accent grave
chr(0xC7): 0x09, # Ç C majuscule cedille
chr(0xA5): 0x03, # ¥ Yen symbol
chr(0xe8): 0x04, # è e accent grave
chr(0xe9): 0x05, # é e accent aigu
chr(0xf9): 0x06, # ù u accent grave
chr(0xEC): 0x07, # ì i accent grave
chr(0xF2): 0x08, # ò o accent grave
chr(0xC7): 0x09, # Ç C majuscule cedille
chr(0x0A): 0x0A, # LF Line Feed
chr(0xD8): 0x0B, # Ø O majuscule barré
chr(0xF8): 0x0C, # ø o minuscule barré
chr(0xD8): 0x0B, # Ø O majuscule barré
chr(0xF8): 0x0C, # ø o minuscule barré
chr(0x0D): 0x0D, # CR Carriage Return
chr(0xC5): 0x0E, # Å Angstroem majuscule
chr(0xE5): 0x0F, # å Angstroem minuscule
chr(0xC5): 0x0E, # Å Angstroem majuscule
chr(0xE5): 0x0F, # å Angstroem minuscule
'_': 0x11, # underscore
chr(0xC6): 0x1C, # Æ majuscule ae
chr(0xE6): 0x1D, # æ minuscule ae
chr(0xDF): 0x1E, # ß s dur allemand
chr(0xC9): 0x1F, # É majuscule é
chr(0xC6): 0x1C, # Æ majuscule ae
chr(0xE6): 0x1D, # æ minuscule ae
chr(0xDF): 0x1E, # ß s dur allemand
chr(0xC9): 0x1F, # É majuscule é
' ': 0x20,
'!': 0x21,
'\"': 0x22, # guillemet
'#': 0x23,
'¤': 0x24, # ¤ carré
chr(0xA1): 0x40, # ¡ point d'exclamation renversé
chr(0xC4): 0x5B, # Ä majuscule A trema
chr(0xE4): 0x7B, # ä minuscule a trema
chr(0xD6): 0x5C, # Ö majuscule O trema
chr(0xF6): 0x7C, # ö minuscule o trema
chr(0xD1): 0x5D, # Ñ majuscule N tilda espagnol
chr(0xF1): 0x7D, # ñ minuscule n tilda espagnol
chr(0xDC): 0x5E, # Ü majuscule U trema
chr(0xFC): 0x7E, # ü minuscule u trema
chr(0xA7): 0x5F, # § signe paragraphe
chr(0xBF): 0x60, # ¿ point interrogation renversé
'à': 0x7F # a accent grave
chr(0xa4): 0x24, # ¤ carré
chr(0xA1): 0x40, # ¡ point d'exclamation renversé
chr(0xC4): 0x5B, # Ä majuscule A trema
chr(0xE4): 0x7B, # ä minuscule a trema
chr(0xD6): 0x5C, # Ö majuscule O trema
chr(0xF6): 0x7C, # ö minuscule o trema
chr(0xD1): 0x5D, # Ñ majuscule N tilda espagnol
chr(0xF1): 0x7D, # ñ minuscule n tilda espagnol
chr(0xDC): 0x5E, # Ü majuscule U trema
chr(0xFC): 0x7E, # ü minuscule u trema
chr(0xA7): 0x5F, # § signe paragraphe
chr(0xBF): 0x60, # ¿ point interrogation renversé
chr(0xe0): 0x7F # à a accent grave
}
@ -162,7 +163,7 @@ def toGSM3_38Bytes(stringtoconvert):
returns a list of bytes
example:
toGSM3_38Bytes("@ùPascal") returns [ 0x00, 0x06, 0x50, 0x61, 0x73, 0x63,
toGSM3_38Bytes("@ùPascal") returns [ 0x00, 0x06, 0x50, 0x61, 0x73, 0x63,
0x61, 0x6C ]
"""

View file

@ -3,15 +3,13 @@
CRC Funktionen
"""
import sys
if sys.version_info[0] == 2:
range = xrange
from six.moves import range
def build_codetable(poly):
"""
builds an ascii codetable for a polynome to be used by a crc checksum
Builds an ascii codetable for a polynome to be used by a crc
checksum.
"""
crc_table = []
for i in range(256):

View file

@ -11,8 +11,14 @@ import time
from ecrterm import transmission
from ecrterm.common import TERMINAL_STATUS_CODES
from ecrterm.packets import *
from ecrterm.transmission.signals import *
from ecrterm.conv import bs2hl, toBytes, toHexString
from ecrterm.exceptions import TransportLayerException
from ecrterm.packets.apdu import Packets
from ecrterm.packets.base_packets import (
Authorisation, Completion, EndOfDay, Packet, PrintLine, Registration,
ResetTerminal, ShowText, StatusEnquiry, StatusInformation)
from ecrterm.packets.bmp import BCD
from ecrterm.transmission.signals import ACK, DLE, ETX, NAK, STX, TRANSMIT_OK
from ecrterm.utils import is_stringlike
@ -29,13 +35,13 @@ def dismantle_serial_packet(data):
crc = None
i = 2
header = data[:i]
# header = conv.bs2hl(header)
# header = bs2hl(header)
# test if there was a transmission:
if header == []:
raise common.TransportLayerException('No Header')
raise TransportLayerException('No Header')
# test our header to be valid
if header != [DLE, STX]:
raise common.TransportLayerException("Header Error: %s" % header)
raise TransportLayerException("Header Error: %s" % header)
# read until DLE, ETX is reached.
dle = False
while not crc and i < len(data):
@ -68,13 +74,13 @@ def parse_represented_data(data):
# represented data
if is_stringlike(data):
# we assume a bytelist like 10 02 03....
data = conv.toBytes(data)
data = toBytes(data)
# first of all, serial data starts with 10 02, so everything
# starting with 10 will be assumed as "serial packet" and first "demantled"
if data[0] == DLE:
try:
crc, data = dismantle_serial_packet(data)
except common.TransportLayerException:
except TransportLayerException:
pass
elif data[0] == ACK:
if len(data) == 1:
@ -94,10 +100,10 @@ def ecr_log(data, incoming=False):
else:
incoming = '>'
if is_stringlike(data):
data = conv.bs2hl(data)
data = bs2hl(data)
# logit to the logfile
try:
_logfile.write('%s %s\n' % (incoming, conv.toHexString(data)))
_logfile.write('%s %s\n' % (incoming, toHexString(data)))
except:
pass
try:
@ -107,7 +113,7 @@ def ecr_log(data, incoming=False):
print("DEBUG: Cannot be represented: %s" % data)
print(e)
_logfile.write('? did not understand ?\n')
data = conv.toHexString(data)
data = toHexString(data)
print("%s %s" % (incoming, data))
except:
import traceback
@ -193,10 +199,10 @@ class ECR(object):
return ret
def _end_of_day_info_packet(self, history=None):
'''
search for an end of day packet status information in the last packets
can also search in any history list.
'''
"""
Search for an end of day packet status information in the last
packets, can also search in any history list.
"""
# helper function to scan for end of day information via packets.
status_info = None
plist = history or self.transmitter.last_history
@ -217,8 +223,8 @@ class ECR(object):
@returns: 0 if there were no protocol errors.
"""
#old_histoire = self.transmitter.history
#self.transmitter.history = []
# old_histoire = self.transmitter.history
# self.transmitter.history = []
# we send the packet
result = self.transmit(EndOfDay(self.password))
# now save the log
@ -237,13 +243,14 @@ class ECR(object):
def last_printout(self):
"""
returns all printlines from the last history.
@todo: TextBlock support - if some printer decides to do it that way.
returns all printlines from the last history.
@todo: TextBlock support - if some printer decides to do it that
way.
"""
printout = []
for entry in self.transmitter.last_history:
inc, packet = entry
#old_histoire += [(inc, packet)]
# old_histoire += [(inc, packet)]
if inc and isinstance(packet, PrintLine):
printout += [packet.fixed_values['text']]
return printout
@ -334,7 +341,8 @@ class ECR(object):
if not self.version:
self.version = self.last.completion.fixed_values.get(
'sw-version', None)
return self.last.completion.fixed_values.get('terminal-status', None)
return self.last.completion.fixed_values.get(
'terminal-status', None)
# no completion means some error.
return False

22
ecrterm/exceptions.py Normal file
View file

@ -0,0 +1,22 @@
class NotEnoughData(Exception):
"""Raised if the APDU has not enough data to make sense."""
class ZVTException(Exception):
"""Base exception for ZVT errors."""
class TransportLayerException(ZVTException):
pass
class TransportTimeoutException(TransportLayerException):
pass
class ApplicationLayerException(ZVTException):
pass
class TransmissionException(ApplicationLayerException):
pass

View file

@ -1,7 +0,0 @@
# -*- coding: utf-8 -*-
"""
Packets.
@author g4b
"""
from ecrterm.packets.base_packets import *

View file

@ -1,21 +1,19 @@
"""Classes and Functions which deal with the APDU Layer."""
import sys
from logging import debug
from ecrterm import conv
from six.moves import range
from ecrterm.conv import toBytes
from ecrterm.exceptions import NotEnoughData
from ecrterm.packets.bitmaps import BITMAPS_ARGS
from ecrterm.packets.bmp import BMP, int_word_split
from ecrterm.utils import is_stringlike
if sys.version_info[0] == 2:
range = xrange
class _PacketRegister:
"""
All Packets come into this register.
Singleton for each Protocol.
All Packets come into this register. Singleton for each Protocol.
"""
# Currencies
CC_EUR = [0x09, 0x78]
@ -57,7 +55,7 @@ class _PacketRegister:
# detects which class to use.
if is_stringlike(datastream):
# lets convert our string into a bytelist.
datastream = conv.toBytes(datastream[:2])
datastream = toBytes(datastream[:2])
# read the first two bytes of the stream.
cc, ci = datastream[:2]
# print '<| %s %s' % (hex(cc), hex(ci))
@ -77,12 +75,6 @@ class APDUPacket(object):
Goal is to not save any binary data in the instance anymore.
Translation from data to classes and vice versa should be fluent.
"""
class NotEnoughData(Exception):
""" raised if the apdu has not enough data to make sense """
pass
class IntegrityError(Exception):
pass
cmd_class = 0x6 # standard.
cmd_instr = None
allowed_bitmaps = None # None=All, [] = None.
@ -160,7 +152,7 @@ class APDUPacket(object):
val = self.fixed_values.get(self.fixed_arguments[i], None)
if val:
if is_stringlike(val):
val = conv.toBytes(val)
val = toBytes(val)
elif isinstance(val, list):
pass
else:
@ -170,9 +162,7 @@ class APDUPacket(object):
return ds
def introspect_fixed(self):
"""
return a description of your fixed data.
"""
"""Return a description of your fixed data."""
return self.fixed_values
def get_data(self):
@ -197,14 +187,14 @@ class APDUPacket(object):
#############################################
def consume_fixed(self, data, length):
"""
Overwrite this Function for your Packet to consume fixed
arguments not represented by bitmaps.
This data usually comes before any bitmaps are present
and each packet has to know for itself, how to handle them.
Overwrite this Function for your Packet to consume fixed
arguments not represented by bitmaps.
This data usually comes before any bitmaps are present
and each packet has to know for itself, how to handle them.
data is the whole packet data after the length part
data is the whole packet data after the length part
length is the given data-length coded into the packet.
length is the given data-length coded into the packet.
"""
# consume all fixed arguments from data here.
# this might be very different from packet to packet.
@ -231,8 +221,7 @@ class APDUPacket(object):
if len(blob) >= pos + length:
data = blob[pos:pos + length]
else:
raise self.NotEnoughData(
"Not enough Data to create the packet data.")
raise NotEnoughData('Not enough Data to create the packet data.')
# step 1: fixed arguments.
# if this packet has some fixed arguments, they have to be
# parsed first.
@ -245,10 +234,10 @@ class APDUPacket(object):
data = property(get_data, set_data)
@classmethod
def parse(cls, blob=""):
def parse(cls, blob=''):
if is_stringlike(blob):
# lets convert our string into a bytelist.
blob = conv.toBytes(blob)
blob = toBytes(blob)
if isinstance(blob, list):
# allright.
# first we detect our packetclass

View file

@ -1,6 +1,7 @@
import datetime
from ecrterm import common, conv
from ecrterm.common import ERRORCODES, INTERMEDIATE_STATUS_CODES
from ecrterm.conv import bs2hl, toHexString
from ecrterm.packets import bmp
from ecrterm.packets.apdu import APDUPacket, Packets
from ecrterm.packets.bmp import BCD
@ -25,8 +26,8 @@ class Packet(APDUPacket):
else:
introspection = '**%s' % bitmap_stati
return "%s{%s %s} %s" % (
self.__class__.__name__, conv.toHexString([self.cmd_class]),
conv.toHexString([self.cmd_instr]), introspection)
self.__class__.__name__, toHexString([self.cmd_class]),
toHexString([self.cmd_instr]), introspection)
def _handle_unknown_response(self, response, tm):
print("Unknown packet response %s" % response)
@ -97,7 +98,7 @@ class Registration(Packet):
cmd_instr = 0x0
fixed_arguments = ['password', 'config_byte', 'cc']
fixed_values = {
'password': '123456', 'config_byte': 0xBE, 'cc': Packets.CC_EUR}
'password': '123456', 'config_byte': 0xBA, 'cc': Packets.CC_EUR}
wait_for_completion = True
def validate(self):
@ -117,7 +118,7 @@ class Registration(Packet):
# only password and byte
# no cc
self.fixed_values['password'] = ''.join(
[conv.toHexString([c]) for c in data[0:3]])
[toHexString([c]) for c in data[0:3]])
self.fixed_values['config_byte'] = data[3]
if length >= 6:
self.fixed_values['cc'] = data[4:6]
@ -195,7 +196,7 @@ class Kassenbericht(Packet):
def consume_fixed(self, data, length):
if length >= 3:
self.fixed_values['password'] = ''.join(
[conv.toHexString([c]) for c in data[0:3]])
[toHexString([c]) for c in data[0:3]])
return data[3:]
return []
@ -212,7 +213,7 @@ class EndOfDay(Packet):
def consume_fixed(self, data, length):
if length >= 3:
self.fixed_values['password'] = ''.join(
[conv.toHexString([c]) for c in data[0:3]])
[toHexString([c]) for c in data[0:3]])
return data[3:]
return []
@ -242,7 +243,7 @@ class Initialisation(Packet):
def consume_fixed(self, data, length):
if length == 3:
self.fixed_values['password'] = ''.join(
[conv.toHexString([c]) for c in data[0:3]])
[toHexString([c]) for c in data[0:3]])
return []
@ -401,24 +402,24 @@ class StatusInformation(Packet):
BCD.as_int(BCD.decode_bcd(totals_list[0:2])),
'receipt-number-end':
BCD.as_int(BCD.decode_bcd(totals_list[2:4])),
'number-ec-card': conv.bs2hl(totals_list[4])[0],
'number-ec-card': bs2hl(totals_list[4])[0],
'turnover-ec-card':
BCD.as_int(BCD.decode_bcd(totals_list[5:5 + 6])),
'number-jcb': conv.bs2hl(totals_list[11])[0],
'number-jcb': bs2hl(totals_list[11])[0],
'turnover-jcb': BCD.as_int(BCD.decode_bcd(totals_list[12:12 + 6])),
'number-eurocard': conv.bs2hl(totals_list[18])[0],
'number-eurocard': bs2hl(totals_list[18])[0],
'turnover-eurocard':
BCD.as_int(BCD.decode_bcd(totals_list[19:19 + 6])),
'number-amex': conv.bs2hl(totals_list[25])[0],
'number-amex': bs2hl(totals_list[25])[0],
'turnover-amex':
BCD.as_int(BCD.decode_bcd(totals_list[26:26 + 6])),
'number-visa': conv.bs2hl(totals_list[32])[0],
'number-visa': bs2hl(totals_list[32])[0],
'turnover-visa':
BCD.as_int(BCD.decode_bcd(totals_list[33:33 + 6])),
'number-diners': conv.bs2hl(totals_list[39])[0],
'number-diners': bs2hl(totals_list[39])[0],
'turnover-diners':
BCD.as_int(BCD.decode_bcd(totals_list[40:40 + 6])),
'number-remaining': conv.bs2hl(totals_list[46])[0],
'number-remaining': bs2hl(totals_list[46])[0],
'turnover-remaining':
BCD.as_int(BCD.decode_bcd(totals_list[47:47 + 6])),
'amount': int(bdict['amount'].value()),
@ -471,7 +472,7 @@ class IntermediateStatusInformation(Packet):
def __repr__(self):
return "IntermediateStatus{04 FF}: %s" % (
common.INTERMEDIATE_STATUS_CODES.get(
INTERMEDIATE_STATUS_CODES.get(
self.fixed_values.get('intermediate_status', None),
'No status'))
@ -511,8 +512,8 @@ class PacketReceivedError(Packet):
def __repr__(self):
return "PacketReceivedERROR{84 %s}: %s" % (
conv.toHexString([self.error_code]),
common.ERRORCODES.get(self.error_code, 'Unknown Error'),
toHexString([self.error_code]),
ERRORCODES.get(self.error_code, 'Unknown Error'),
)
@ -560,7 +561,7 @@ class PrintLine(Packet):
def enrich_fixed(self):
# take attribute first
bs = [self.fixed_values.get('attribute', 0)]
bs += conv.bs2hl(self.fixed_values.get('text', ''))
bs += bs2hl(self.fixed_values.get('text', ''))
return bs
@ -672,7 +673,7 @@ class StatusEnquiry(Packet):
def consume_fixed(self, data, length):
if length:
self.fixed_values['password'] = ''.join(
[conv.toHexString([c]) for c in data[0:3]])
[toHexString([c]) for c in data[0:3]])
return data[3:]

View file

@ -1,71 +1,97 @@
# -*- coding: utf-8 -*-
"""
Implements all known Bitmaps in two Dictionaries:
BITMAPS is keyed after the CODE of the bitmap in the protocol.
use this if you know "the code" of the bitmap and retrieve its class,
keyname (and description)
BITMAPS_ARGS is created from BITMAPS, representing bitmaps in a
key-value-store. use this if you know "the key name" of the bitmap
and want to get its class, code (and description)
Implements all known Bitmaps in two Dictionaries:
BITMAPS is keyed after the CODE of the bitmap in the protocol.
use this if you know "the code" of the bitmap and retrieve its class,
keyname (and description)
BITMAPS_ARGS is created from BITMAPS, representing bitmaps in a
key-value-store. use this if you know "the key name" of the bitmap
and want to get its class, code (and description)
"""
from ecrterm.packets.bmp import BMP
BITMAPS = {
0x01: (BMP.FormatByte(1), 'timeout', "binary time-out"),
0x02: (BMP.FormatByte(1), 'max_status_infos', "binary max.status infos"),
0x03: (BMP.FormatByte(1), 'service_byte', "binary service-byte"),
0x04: (BMP.FormatBCDByte(6), 'amount', "Amount"),
0x05: (BMP.FormatByte(1), 'pump_nr', "binary pump-Nr."),
0x06: (BMP.FormatTLV(), 'tlv', "TLV"),
0x0B: (BMP.FormatBCDByte(3), 'trace_number', "trace-number"),
0x0C: (BMP.FormatBCDByte(3), 'time', "Time"),
0x0D: (BMP.FormatBCDByte(2), 'date_day', "date, MM DD (see AA)"),
0x0E: (BMP.FormatBCDByte(2), 'card_expire', "expiry-date, YY MM"),
0x17: (BMP.FormatBCDByte(2), 'card_sequence_number', "card sequence-number"),
0x19: (BMP.FormatByte(1), 'type', "binary status-byte/payment-type/card-type"),
0x22: (BMP.FormatLLVAR(), 'card_number', "card_number, PAN / EF_ID, 'E' used to indicate masked numeric digit"),
0x23: (BMP.FormatLLVAR(), 'track_2', "track 2 data, 'E' used to indicate masked numeric digit1"),
0x24: (BMP.FormatLLLVAR(), 'track_3', "track 3 data, 'E' used to indicate masked numeric digit1"),
0x27: (BMP.FormatByte(1), 'result_code', "binary result-code"),
0x29: (BMP.FormatBCDByte(4), 'tid', "TID"),
0x2A: (BMP.FormatByte(15), 'vu', "ASCII VU-number"),
0x2D: (BMP.FormatLLVAR(), 'track_1', "track 1 data"),
0x2E: (BMP.FormatLLLVAR(), 'sync_chip_data', "sychronous chip data"),
0x37: (BMP.FormatBCDByte(3), 'trace_number_original', "trace-number of the original transaction for reversal"),
0x3A: (BMP.FormatBCDByte(2), 'cvv', 'the field cvv is optionally used for mail order'),
0x3B: (BMP.FormatByte(8), 'aid', "AID authorisation-attribute"),
0x3C: (BMP.FormatLLLVAR(), 'additional', "additional-data/additional-text"),
0x3D: (BMP.FormatBCDByte(3), 'password', "Password"),
0x49: (BMP.FormatBCDByte(2), 'currency_code', "currency code"),
0x60: (BMP.FormatLLLVAR(), 'totals', "individual totals"),
0x87: (BMP.FormatBCDByte(2), 'receipt', "receipt-number"),
0x88: (BMP.FormatBCDByte(3), 'turnover', "turnover record number"),
0x8A: (BMP.FormatByte(1), 'card_type', "binary card-type (card-number according to ZVT-protocol; comparison 8C)"),
0x8B: (BMP.FormatLLVAR(), 'card_name', "card-name"),
0x8C: (BMP.FormatByte(1), 'card_operator', "binary card-type-ID of the network operator (comparison 8A)"),
0x92: (BMP.FormatLLLVAR(), 'offline_chip', "additional-data ec-Cash with chip offline"),
0x9A: (BMP.FormatLLLVAR(), 'geldkarte', "Geldkarte payments-/ failed-payment record/total record Geldkarte"),
0xA0: (BMP.FormatByte(1), 'result_code_as', "binary result-code-AS"),
0xA7: (BMP.FormatLLVAR(), 'chip_ef_id', "chip-data, EF_ID"),
0xAA: (BMP.FormatBCDByte(3), 'date', "date YY MM DD (see 0D)"),
0xAF: (BMP.FormatLLLVAR(), 'ef_info', "EF_Info"),
0xBA: (BMP.FormatByte(5), 'aid_param', "binary AID-parameter"),
0xD0: (BMP.FormatByte(1), 'algo_key', "binary algorithm-Key"),
0xD1: (BMP.FormatLLVAR(), 'offset', "card offset/PIN-data"),
0xD2: (BMP.FormatByte(1), 'direction', "binary direction"),
0xD3: (BMP.FormatByte(1), 'key_position', "binary key-position"),
0xE0: (BMP.FormatByte(1), 'input_min', "binary min. length of the input"),
0xE1: (BMP.FormatLLVAR(), 'iline1', "text2 line 1"),
0xE2: (BMP.FormatLLVAR(), 'iline2', "text2 line 2"),
0xE3: (BMP.FormatLLVAR(), 'iline3', "text2 line 3"),
0xE4: (BMP.FormatLLVAR(), 'iline4', "text2 line 4"),
0xE5: (BMP.FormatLLVAR(), 'iline5', "text2 line 5"),
0xE6: (BMP.FormatLLVAR(), 'iline6', "text2 line 6"),
0xE7: (BMP.FormatLLVAR(), 'iline7', "text2 line 7"),
0xE8: (BMP.FormatLLVAR(), 'iline8', "text2 line 8"),
0xE9: (BMP.FormatByte(1), 'max_input_length', "binary max. length of the input"),
0x01: (BMP.FormatByte(1), 'timeout', 'binary time-out'),
0x02: (BMP.FormatByte(1), 'max_status_infos', 'binary max.status infos'),
0x03: (BMP.FormatByte(1), 'service_byte', 'binary service-byte'),
0x04: (BMP.FormatBCDByte(6), 'amount', 'Amount'),
0x05: (BMP.FormatByte(1), 'pump_nr', 'binary pump-Nr.'),
0x06: (BMP.FormatTLV(), 'tlv', 'TLV'),
0x0B: (BMP.FormatBCDByte(3), 'trace_number', 'trace-number'),
0x0C: (BMP.FormatBCDByte(3), 'time', 'Time'),
0x0D: (BMP.FormatBCDByte(2), 'date_day', 'date, MM DD (see AA)'),
0x0E: (BMP.FormatBCDByte(2), 'card_expire', 'expiry-date, YY MM'),
0x17: (
BMP.FormatBCDByte(2), 'card_sequence_number', 'card sequence-number'),
0x19: (
BMP.FormatByte(1), 'type',
'binary status-byte/payment-type/card-type'),
0x22: (
BMP.FormatLLVAR(), 'card_number',
'card_number, PAN / EF_ID, \'E\' used to indicate masked numeric '
'digit'),
0x23: (
BMP.FormatLLVAR(), 'track_2',
'track 2 data, \'E\' used to indicate masked numeric digit1'),
0x24: (
BMP.FormatLLLVAR(), 'track_3',
'track 3 data, \'E\' used to indicate masked numeric digit1'),
0x27: (BMP.FormatByte(1), 'result_code', 'binary result-code'),
0x29: (BMP.FormatBCDByte(4), 'tid', 'TID'),
0x2A: (BMP.FormatByte(15), 'vu', 'ASCII VU-number'),
0x2D: (BMP.FormatLLVAR(), 'track_1', 'track 1 data'),
0x2E: (BMP.FormatLLLVAR(), 'sync_chip_data', 'sychronous chip data'),
0x37: (
BMP.FormatBCDByte(3), 'trace_number_original',
'trace-number of the original transaction for reversal'),
0x3A: (
BMP.FormatBCDByte(2), 'cvv',
'the field cvv is optionally used for mail order'),
0x3B: (BMP.FormatByte(8), 'aid', 'AID authorisation-attribute'),
0x3C: (
BMP.FormatLLLVAR(), 'additional', 'additional-data/additional-text'),
0x3D: (BMP.FormatBCDByte(3), 'password', 'Password'),
0x49: (BMP.FormatBCDByte(2), 'currency_code', 'currency code'),
0x60: (BMP.FormatLLLVAR(), 'totals', 'individual totals'),
0x87: (BMP.FormatBCDByte(2), 'receipt', 'receipt-number'),
0x88: (BMP.FormatBCDByte(3), 'turnover', 'turnover record number'),
0x8A: (
BMP.FormatByte(1), 'card_type',
'binary card-type (card-number according to ZVT-protocol; comparison '
'8C)'),
0x8B: (BMP.FormatLLVAR(), 'card_name', 'card-name'),
0x8C: (
BMP.FormatByte(1), 'card_operator',
'binary card-type-ID of the network operator (comparison 8A)'),
0x92: (
BMP.FormatLLLVAR(), 'offline_chip',
'additional-data ec-Cash with chip offline'),
0x9A: (
BMP.FormatLLLVAR(), 'geldkarte',
'Geldkarte payments-/ failed-payment record/total record Geldkarte'),
0xA0: (BMP.FormatByte(1), 'result_code_as', 'binary result-code-AS'),
0xA7: (BMP.FormatLLVAR(), 'chip_ef_id', 'chip-data, EF_ID'),
0xAA: (BMP.FormatBCDByte(3), 'date', 'date YY MM DD (see 0D)'),
0xAF: (BMP.FormatLLLVAR(), 'ef_info', 'EF_Info'),
0xBA: (BMP.FormatByte(5), 'aid_param', 'binary AID-parameter'),
0xD0: (BMP.FormatByte(1), 'algo_key', 'binary algorithm-Key'),
0xD1: (BMP.FormatLLVAR(), 'offset', 'card offset/PIN-data'),
0xD2: (BMP.FormatByte(1), 'direction', 'binary direction'),
0xD3: (BMP.FormatByte(1), 'key_position', 'binary key-position'),
0xE0: (BMP.FormatByte(1), 'input_min', 'binary min. length of the input'),
0xE1: (BMP.FormatLLVAR(), 'iline1', 'text2 line 1'),
0xE2: (BMP.FormatLLVAR(), 'iline2', 'text2 line 2'),
0xE3: (BMP.FormatLLVAR(), 'iline3', 'text2 line 3'),
0xE4: (BMP.FormatLLVAR(), 'iline4', 'text2 line 4'),
0xE5: (BMP.FormatLLVAR(), 'iline5', 'text2 line 5'),
0xE6: (BMP.FormatLLVAR(), 'iline6', 'text2 line 6'),
0xE7: (BMP.FormatLLVAR(), 'iline7', 'text2 line 7'),
0xE8: (BMP.FormatLLVAR(), 'iline8', 'text2 line 8'),
0xE9: (
BMP.FormatByte(1), 'max_input_length',
"binary max. length of the input"),
0xEA: (BMP.FormatByte(1), 'input_echo', "binary echo the Input"),
0xEB: (BMP.FormatByte(8), 'mac', "binary MAC over text 1 and text 2"),
0xF0: (BMP.FormatByte(1), 'display_duration', "binary display-duration"),
@ -79,7 +105,9 @@ BITMAPS = {
0xF8: (BMP.FormatLLVAR(), 'line8', "text1 line 8"),
0xF9: (BMP.FormatByte(1), 'beeps', "binary number of beep-tones"),
0xFA: (BMP.FormatByte(1), 'status', "binary status"),
0xFB: (BMP.FormatByte(1), 'ok_required', "binary confirmation the input with <OK> required"),
0xFB: (
BMP.FormatByte(1), 'ok_required',
"binary confirmation the input with <OK> required"),
0xFC: (BMP.FormatByte(1), 'dialog_control', "binary dialog-control"),
}

View file

@ -1,24 +1,22 @@
# -*- coding: utf-8 -*-
"""
Implementation of the Bitmap Variable Layer
Each variable in the protocol is saved into a bitmap.
"""
import struct
import sys
from struct import pack
from six.moves import range
from ecrterm import conv
from ecrterm.common import Dumpling
from ecrterm.utils import is_stringlike
if sys.version_info[0] == 2:
range = xrange
def int_word_split(x, endian='>'): # default big endian.
""" splits 2byte integer (sometimes called a word) into 2 byte list"""
return conv.bs2hl(struct.pack('%sH' % endian, x & 0xFFFF))
return conv.bs2hl(pack('%sH' % endian, x & 0xFFFF))
class BMPFactory(Dumpling):
@ -124,7 +122,7 @@ class BMP(BMPFactory):
"""
Each digit in the number is broken up and added to FACTOR sequentially.
Note: x has to be a number.
>>> [ hex(i) for i in BMP.encode_fcd( 1234 ) ]
>>> [ hex(i) for i in BMP.encode_fcd( 1234 ) ]
['0xf1', '0xf2', '0xf3', '0xf4']
"""
return [factor + int(i) for i in list(str(int(x)))]
@ -186,13 +184,13 @@ class LVAR(BMP):
ret = [self._id]
lines = [self._data, ]
for line in lines:
l = LVAR.length(len(line))
while len(l) < self.LL:
l = [0xF0] + l
length = LVAR.length(len(line))
while len(length) < self.LL:
length = [0xF0] + length
if is_stringlike(line):
ret += l + conv.bs2hl(line)
ret += length + conv.bs2hl(line)
elif isinstance(line, list):
ret += l + line
ret += length + line
else:
raise TypeError(
"Line has unsupported type in LVAR: %s" % type(line))
@ -203,12 +201,12 @@ class LVAR(BMP):
do the exact opposite of dump.
"""
# read the length
l = data[:self.LL]
l = BMP.decode_fcd(l)
length = data[:self.LL]
length = BMP.decode_fcd(length)
# get the data
data = data[self.LL:]
self._data = data[:l] # conversion of any kinds ?
return data[l:] # we return the rest.
self._data = data[:length] # conversion of any kinds ?
return data[length:] # we return the rest.
@classmethod
def length(cls, length):

View file

@ -1,22 +1,20 @@
# -*- coding: utf-8 -*-
"""
TLV Container Format.
TAG FIELD:
# first byte:
# b8,b7: class visibility
# b6: data object constructed?
# b5-b2: tag number.
# b1: if 1, continue at next byte.
# next byte:
b8: last byte?
... number.
LENGTH:
# b8 if 0, length is a 7 bit number.
# b8 if 1, this byte only codes how many follow.
TLV Container Format.
TAG FIELD:
# first byte:
# b8,b7: class visibility
# b6: data object constructed?
# b5-b2: tag number.
# b1: if 1, continue at next byte.
# next byte:
b8: last byte?
... number.
LENGTH:
# b8 if 0, length is a 7 bit number.
# b8 if 1, this byte only codes how many follow.
"""
from ecrterm.packets.bmp import BMP
@ -26,8 +24,8 @@ class TLV(BMP):
@classmethod
def length(cls, length):
""" transforms a number into a TLV Length
returns list of bytes
"""
Transforms a number into a TLV Length ,returns list of bytes.
"""
if length >= 0x80: # 128 or more...
# we need more than 1 byte.

21
ecrterm/requirements.txt Normal file
View file

@ -0,0 +1,21 @@
backcall==0.1.0
decorator==4.3.0
flake8==3.5.0
ipython==6.3.1
ipython-genutils==0.2.0
isort==4.3.4
jedi==0.12.0
mccabe==0.6.1
parso==0.2.0
pexpect==4.5.0
pickleshare==0.7.4
prompt-toolkit==1.0.15
ptyprocess==0.5.2
pycodestyle==2.3.1
pyflakes==1.6.0
Pygments==2.2.0
pyserial==3.4
simplegeneric==0.8.1
six==1.11.0
traitlets==4.3.2
wcwidth==0.1.7

View file

@ -1,11 +0,0 @@
"""
Tests for the ecrterm package.
@author g4b
"""
import unittest
def suite():
suite = unittest.TestSuite()
suite.addTest()
return suite

View file

@ -13,21 +13,26 @@ Diese Tests sehen nach ob unsere Klassen dieselben binären daten
erzeugen.
"""
import unittest
from unittest import TestCase, main
from ecrterm import conv
from ecrterm.packets import *
from ecrterm.transmission import ACK, NAK, SerialMessage
from ecrterm.conv import toHexString
from ecrterm.packets.base_packets import (
Authorisation, Diagnosis, Initialisation, PacketReceived,
PacketReceivedError, PrintLine, Registration, ResetTerminal, ShowText,
StatusEnquiry)
from ecrterm.transmission.signals import ACK, NAK
from ecrterm.transmission.transport_serial import SerialMessage
def list_of_bytes(apdu):
sm = SerialMessage(apdu)
byte_list = sm.dump_message()
# return " ".join(["%02h" % i for i in byte_list])
return conv.toHexString(byte_list)
return toHexString(byte_list)
class TestCaseDataEncoding(unittest.TestCase):
class TestCaseDataEncoding(TestCase):
maxDiff = None
def setUp(self):
pass
@ -38,60 +43,65 @@ class TestCaseDataEncoding(unittest.TestCase):
def test_Anmeldung(self):
# Register Packet std.
data_expected = """10 02 06 00 06 12 34 56 BA 09 78 10 03 24 C3"""
data_expected = '10 02 06 00 06 12 34 56 BA 09 78 10 03 24 C3'
pk = Registration()
self.assertEqual(data_expected, list_of_bytes(pk))
#
def test_Initialisierung(self):
# Initialization Std.
data_expected = """10 02 06 93 03 12 34 56 10 03 CA A4"""
data_expected = '10 02 06 93 03 12 34 56 10 03 CA A4'
pk = Initialisation()
self.assertEqual(data_expected, list_of_bytes(pk))
#
def test_Zahlung_eccash(self):
# Authorisation
data_expected = """10 02 06 01 0A 04 00 00 00 01 10 10 00 49 09 78 10 03 F2 FF"""
data_expected = \
'10 02 06 01 0A 04 00 00 00 01 10 10 00 49 09 78 10 03 F2 FF'
pk = Authorisation(amount=11000,
currency_code=978)
self.assertEqual(data_expected, list_of_bytes(pk))
#
def test_packet_diagnosis(self):
# Diagnosis
data_expected = """10 02 06 70 00 10 03 D9 F9"""
data_expected = '10 02 06 70 00 10 03 D9 F9'
pk = Diagnosis()
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_printline(self):
# Print Line Packet
data_expected = """10 02 06 D1 19 00 47 65 73 61 6D 74 20 20 20 20 20 20 30 20 20 20 20 20 20 20 30 2C 30 30 10 03 B5 AB"""
pk = PrintLine(text='Gesamt 0 0,00',
attribute=0)
data_expected = \
'10 02 06 D1 19 00 47 65 73 61 6D 74 20 20 20 20 20 20 30 20 20 ' \
'20 20 20 20 20 30 2C 30 30 10 03 B5 AB'
pk = PrintLine(
text='Gesamt 0 0,00', attribute=0)
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_received(self):
data_expected = """10 02 80 00 00 10 03 F5 1F"""
data_expected = '10 02 80 00 00 10 03 F5 1F'
pk = PacketReceived()
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_received_error(self):
data_expected = """10 02 84 9C 00 10 03 C3 41"""
data_expected = '10 02 84 9C 00 10 03 C3 41'
pk = PacketReceivedError()
pk.set_error_code(0x9c)
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_resetterminal(self):
data_expected = """10 02 06 18 00 10 03 56 3A"""
data_expected = '10 02 06 18 00 10 03 56 3A'
pk = ResetTerminal()
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_showtext(self):
data_expected = """10 02 06 E0 25 F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 10 03 5A BA"""
data_expected = \
'10 02 06 E0 25 F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 ' \
'4F 4B F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 ' \
'10 03 5A BA'
lines = ['FAHRERNUMMER ', 'EINGEBEN UND OK', ]
# F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 //FAHRERNUMMER
# F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B//EINGEBEN UND OK
# F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20
# //FAHRERNUMMER
# F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B
# //EINGEBEN UND OK
pk = ShowText(
# display_duration=0,
line1=lines[0],
@ -101,10 +111,10 @@ class TestCaseDataEncoding(unittest.TestCase):
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_statusenquiry(self):
data_expected = """10 02 05 01 03 12 34 56 10 03 E0 43"""
data_expected = '10 02 05 01 03 12 34 56 10 03 E0 43'
pk = StatusEnquiry()
self.assertEqual(data_expected, list_of_bytes(pk))
if __name__ == '__main__':
unittest.main()
main()

View file

@ -4,15 +4,12 @@ Misc. Tests.
@author g4b
"""
import unittest
from unittest import TestCase, main
from ecrterm.packets.bmp import *
#import sys
#sys.path.insert(0, '..')
from ecrterm.packets.bmp import BCD
class TestSequenceFunctions(unittest.TestCase):
class TestSequenceFunctions(TestCase):
def setUp(self):
pass
@ -66,4 +63,4 @@ class TestSequenceFunctions(unittest.TestCase):
if __name__ == '__main__':
unittest.main()
main()

View file

@ -3,19 +3,16 @@
Incoming Packets should be always parsable.
this test tries to look at the parser in detail.
"""
import logging
import sys
import unittest
from logging import info
from unittest import TestCase, main
from ecrterm import conv
from ecrterm.ecr import parse_represented_data
from ecrterm.packets import *
from ecrterm.packets.bmp import *
#sys.path.insert(0, '..')
from ecrterm.packets.apdu import Packets
from ecrterm.packets.base_packets import Completion, Packet
class TestParsingMechanisms(unittest.TestCase):
class TestParsingMechanisms(TestCase):
def setUp(self):
pass
@ -32,35 +29,41 @@ class TestParsingMechanisms(unittest.TestCase):
packet)
def test_version_completion(self):
# following completion is sent by the PT with version on statusenquiry:
data_expected = """10 02 06 0F 0B F0 F0 F7 32 2E 31 34 2E 31 35 00 10 03 B1 11"""
# small test to test the completion with software version to be recognized.
# following completion is sent by the PT with version on
# statusenquiry:
data_expected = \
'10 02 06 0F 0B F0 F0 F7 32 2E 31 34 2E 31 35 00 10 03 B1 11'
# small test to test the completion with software version to be
# recognized.
rep = parse_represented_data(data_expected)
self.assertEqual(rep.__class__, Completion)
def test_parsing_two(self):
"""
parse some packets
- from the tutorial
- from complicated scenarios
- from failing parsings
and tell me if they are understood:
parse some packets
- from the tutorial
- from complicated scenarios
- from failing parsings
and tell me if they are understood:
"""
PACKETS = [
# 06 D1
'10 02 06 D1 17 00 20 20 20 20 20 20 20 20 20 4B 61 73 73 65 6E 73 63 68 6E 69 74 74 10 03 2F 07',
'10 02 06 D1 17 00 20 20 20 20 20 20 20 20 20 4B 61 73 73 65 6E '
'73 63 68 6E 69 74 74 10 03 2F 07',
# 04 0F
'10 02 04 0F 37 27 00 04 00 00 00 00 40 00 49 09 78 0C 09 38 48 0D 04 25 22 F1 F1 59 66 66 66 66'\
'D2 00 21 22 01 00 17 00 01 87 01 75 0B 61 39 95 19 40 29 60 09 99 14 0E 05 12 8A 02 10 03 90 8C',
'10 02 04 0F 37 27 00 04 00 00 00 00 40 00 49 09 78 0C 09 38 48 '
'0D 04 25 22 F1 F1 59 66 66 66 66 D2 00 21 22 01 00 17 00 01 87 '
'01 75 0B 61 39 95 19 40 29 60 09 99 14 0E 05 12 8A 02 10 03 90 '
'8C',
]
i = 0
idx = 0
for packet in PACKETS:
rep = parse_represented_data(packet)
logging.info(rep)
info(rep)
if not isinstance(rep, Packet):
raise AssertionError("Packet could not be parsed: #%s" % i)
i += 1
raise AssertionError("Packet could not be parsed: #%s" % idx)
idx += 1
if __name__ == '__main__':
unittest.main()
main()

View file

@ -1,18 +1,14 @@
# -*- coding: utf-8 -*-
"""
Transmission.
You could say, this is the application layer of the ZVT Protocol
Transmission regulates the packetflow and the rules where the packets go.
It uses a Transport (SerialTransport) to send its data, and you feed
it with a packet through the transmit() method. All further communication
is done in the packets.
@author g4b
"""
Transmission.
from ecrterm.transmission import signals, zvt
from ecrterm.transmission._transmission import *
from ecrterm.transmission.transport_serial import SerialTransport
You could say, this is the application layer of the ZVT Protocol
Transmission regulates the packetflow and the rules where the packets go.
It uses a Transport (SerialTransport) to send its data, and you feed
it with a packet through the transmit() method. All further communication
is done in the packets.
@author g4b
"""

View file

@ -2,13 +2,10 @@
Transmission Basics.
@author g4b
"""
from ecrterm import common
from ecrterm.exceptions import TransmissionException, TransportLayerException
from ecrterm.packets.base_packets import PacketReceived
from ecrterm.transmission.signals import *
class TransmissionException(common.ApplicationLayerException):
pass
from ecrterm.transmission.signals import (
TIMEOUT_T4_DEFAULT, TRANSMIT_ERROR, TRANSMIT_OK, TRANSMIT_TIMEOUT)
class Transmission(object):
@ -67,7 +64,7 @@ class Transmission(object):
"""
if not self.is_master or self.is_waiting:
raise TransmissionException(
"Can't send until transmisson is ready")
'Can\'t send until transmisson is ready')
self.is_master = False
self.last = packet
try:
@ -84,7 +81,7 @@ class Transmission(object):
success, response = self.transport.receive(
self.actual_timeout)
history += [(True, response)]
except common.TransportLayerException:
except TransportLayerException:
# some kind of timeout.
# if we are already master, we can bravely ignore this.
if self.is_master:
@ -96,7 +93,7 @@ class Transmission(object):
# we actually have to handle a last packet
stay_master = self.handle_packet_response(
packet, response)
print("Is Master Read Ahead happened.")
print('Is Master Read Ahead happened.')
self.is_master = stay_master
except Exception as e:
self.is_master = True

View file

@ -1,21 +1,21 @@
# -*- coding: utf-8 -*-
"""
Serial Layer
The Serial Layer is a transport used for
@author g4b
"""
Serial Layer
import os # @UnresolvedImport
import select
import time
The Serial Layer is a transport used for
@author g4b
"""
import serial
from ecrterm import common, conv, crc
from ecrterm.common import Transport
from ecrterm.conv import bs2hl, hl2bs, toBytes, toHexString
from ecrterm.crc import crc_xmodem16
from ecrterm.exceptions import (
TransportLayerException, TransportTimeoutException)
from ecrterm.packets.apdu import APDUPacket
from ecrterm.transmission.signals import *
from ecrterm.transmission.signals import (
ACK, DLE, ETX, NAK, STX, TIMEOUT_T1, TIMEOUT_T2)
from ecrterm.utils import ensure_bytes, is_stringlike
SERIAL_DEBUG = False
@ -24,11 +24,11 @@ SERIAL_DEBUG = False
def std_serial_log(instance, data, incoming=False):
try:
if is_stringlike(incoming):
data = conv.bs2hl(data)
data = bs2hl(data)
if incoming:
print("< %s" % conv.toHexString(data))
print("< %s" % toHexString(data))
else:
print("> %s" % conv.toHexString(data))
print("> %s" % toHexString(data))
except:
print("| error in log")
@ -48,16 +48,15 @@ class SerialMessage(object):
def __init__(self, apdu=None):
if is_stringlike(apdu):
# try to get the list of bytes.
apdu = conv.toBytes(apdu.replace(' ', ''))
apdu = toBytes(apdu.replace(' ', ''))
elif isinstance(apdu, APDUPacket):
apdu = apdu.to_list()
self.apdu = apdu
def _get_crc(self):
data = conv.hl2bs(self.apdu + [ETX])
# print "crc for %s => %s" % ([hex(i) for i in self.apdu], hex(crc.crc_xmodem16(data)))
data = hl2bs(self.apdu + [ETX])
try:
return crc.crc_xmodem16(data)
return crc_xmodem16(data)
except:
print(self.apdu)
raise
@ -89,7 +88,7 @@ class SerialMessage(object):
def __repr__(self):
return "SerialMessage (APDU: %s, CRC-L: %s CRC-H: %s)" % (
conv.toHexString(self.apdu),
toHexString(self.apdu),
hex(self.crc_l),
hex(self.crc_h))
@ -99,10 +98,10 @@ class SerialMessage(object):
return [DLE, STX] + apdu + [DLE, ETX, self.crc_l, self.crc_h]
def as_bin(self):
return conv.hl2bs(self.dump_message())
return hl2bs(self.dump_message())
class SerialTransport(common.Transport):
class SerialTransport(Transport):
SerialCls = serial.Serial
slog = noop
@ -121,7 +120,7 @@ class SerialTransport(common.Transport):
xonxoff=0, # disable software flow control
rtscts=0, # disable RTS/CTS flow control
)
if ser.isOpen() == False:
if not ser.isOpen():
ser.open()
# 8< got that from somwhere, not sure what it does:
ser.setRTS(1)
@ -146,7 +145,7 @@ class SerialTransport(common.Transport):
def write(self, something=None):
if something:
try:
self.slog(conv.bs2hl(something))
self.slog(bs2hl(something))
finally:
self.connection.write(ensure_bytes(something)) # !?
@ -167,19 +166,20 @@ class SerialTransport(common.Transport):
"""
reads a message packet. any errors are raised directly.
"""
# if in 5 seconds no message appears, we respond with a nak and raise an error.
# if in 5 seconds no message appears, we respond with a nak and
# raise an error.
self.connection.timeout = timeout
apdu = []
crc = None
header = self.connection.read(2)
header = conv.bs2hl(header)
header = bs2hl(header)
# test if there was a transmission:
if header == []:
raise common.TransportLayerException('Reading Header Timeout')
raise TransportLayerException('Reading Header Timeout')
# test our header to be valid
if header != [DLE, STX]:
self.slog(header, True)
raise common.TransportLayerException("Header Error: %s" % header)
raise TransportLayerException("Header Error: %s" % header)
# read until DLE, ETX is reached.
dle = False
# timeout to T1 after header.
@ -188,17 +188,17 @@ class SerialTransport(common.Transport):
b = ord(self.connection.read(1)) # read a byte.
if b is None:
# timeout
raise common.TransportLayerException(
raise TransportLayerException(
"Timeout T1 reading stream.")
if b == ETX and dle:
# dle was set, and this is ETX, so we are at the end.
# we read the CRC now.
crc = self.connection.read(2)
if not crc:
raise common.TransportLayerException(
raise TransportLayerException(
"Timeout T1 reading CRC")
else:
crc = conv.bs2hl(crc)
crc = bs2hl(crc)
# and break
continue
elif b == DLE:
@ -212,7 +212,7 @@ class SerialTransport(common.Transport):
elif dle:
# dle was set, but we got no etx here.
# this seems to be an error.
raise common.TransportLayerException(
raise TransportLayerException(
"DLE without sense detected.")
# we add this byte to our apdu.
apdu += [b]
@ -274,15 +274,14 @@ class SerialTransport(common.Transport):
# if tries < 3:
# return self.send_message(message, tries + 1, no_answer)
# else:
raise common.TransportLayerException("Could not send message")
raise TransportLayerException('Could not send message')
elif not acknowledge:
# this happens quite a lot with the ingenico devices.
# possibly a workaround would be nice.
raise common.TransportTimeoutException(
"No Answer, Possible Timeout")
raise TransportTimeoutException('No Answer, Possible Timeout')
else:
raise common.TransportLayerException(
"Unknown Acknowledgment Byte %s" % conv.bs2hl(acknowledge))
raise TransportLayerException(
'Unknown Acknowledgment Byte %s' % bs2hl(acknowledge))
def send(self, apdu, tries=0, no_wait=False):
"""

View file

@ -1,16 +1,21 @@
# -*- coding: utf-8 -*-
from os import read as os_read
from select import select
import serial # @UnresolvedImport
from ecrterm.transmission.transport_serial import *
from ecrterm.transmission.transport_serial import SerialTransport
class SerialTransportUnbuffered(SerialTransport):
class UnbufferedSerial(serial.Serial):
""" override Serial.read to use the *unbuffered* read function """
"""Override Serial.read to use the *unbuffered* read function."""
def read(self, size=1, timeout=None):
"""Read size bytes from the serial port. If a timeout is set it may
return less characters as requested. With no timeout it will block
until the requested number of bytes is read."""
"""
Read size bytes from the serial port. If a timeout is set it
may return less characters as requested. With no timeout it
will block until the requested number of bytes is read.
"""
if self.fd is None:
raise serial.portNotOpenError
read = []
@ -23,10 +28,10 @@ class SerialTransportUnbuffered(SerialTransport):
if size > 0:
while nread < size:
# print "\tread(): size",size, "have", len(read) #debug
ready, _, _ = select.select(fds, [], [], timeout)
ready, _, _ = select(fds, [], [], timeout)
if not ready:
break # timeout
buf = os.read(fd, size - nread)
buf = os_read(fd, size - nread)
if not buf:
break # early abort on timeout or error
read.append(buf)

View file

@ -1,20 +1,21 @@
"""
Transmission Object for ZVT Protocol.
@author g4b
Transmission Object for ZVT Protocol.
@author g4b
"""
from ecrterm.packets.base_packets import PacketReceived, PacketReceivedError
from ecrterm.transmission._transmission import (
Transmission, TransmissionException)
from ecrterm.transmission.signals import *
from ecrterm.exceptions import TransmissionException, TransportLayerException
from ecrterm.packets.base_packets import PacketReceived
from ecrterm.transmission._transmission import Transmission
from ecrterm.transmission.signals import (
TIMEOUT_T4_DEFAULT, TRANSMIT_ERROR, TRANSMIT_OK, TRANSMIT_TIMEOUT)
class ZVTTransmission(Transmission):
"""
A Transmission Object represents an open connection between ECR and PT.
It regulates the flow of packets, and uses a Transport
to send its data.
to send its data.
The default Transport to use is the serial transport.
"""
actual_timeout = TIMEOUT_T4_DEFAULT
@ -28,14 +29,14 @@ class ZVTTransmission(Transmission):
def log_response(self, response):
"""
every response is saved into self.log_list.
hook this for live data.
Every response is saved into self.log_list.
Hook this for live data.
"""
self.log_list += [response]
def send_received(self):
"""
send the "Packet Received" Packet.
send the 'Packet Received' Packet.
"""
packet = PacketReceived()
self.history += [(False, packet), ]
@ -43,18 +44,18 @@ class ZVTTransmission(Transmission):
def handle_packet_response(self, packet, response):
"""
a shortcut for calling the handle_response of the packet.
A shortcut for calling the handle_response of the packet.
"""
return packet.handle_response(response, self)
def transmit(self, packet):
"""
Transmit the packet, go into slave mode and wait until the
whole sequence is finished.
Transmit the packet, go into slave mode and wait until the whole
sequence is finished.
"""
if not self.is_master or self.is_waiting:
raise TransmissionException(
"Can't send until transmisson is ready")
'Can\'t send until transmisson is ready')
self.is_master = False
try:
self.history += [(False, packet), ]
@ -69,7 +70,7 @@ class ZVTTransmission(Transmission):
success, response = self.transport.receive(
self.actual_timeout)
self.history += [(True, response)]
except common.TransportLayerException:
except TransportLayerException:
# some kind of timeout.
# if we are already master, we can bravely ignore this.
if self.is_master:
@ -81,7 +82,7 @@ class ZVTTransmission(Transmission):
# we actually have to handle a last packet
stay_master = self.handle_packet_response(
packet, response)
print("Is Master Read Ahead happened.")
print('Is Master Read Ahead happened.')
self.is_master = stay_master
except Exception as e:
print(e)

View file

@ -50,7 +50,8 @@ def detect_pt(device='/dev/ttyUSB0', timeout=2, silent=True,
try:
if not errors:
if isinstance(e.last.completion, packets.Completion):
return e.last.completion.fixed_values.get('sw-version', True) or True
return e.last.completion.fixed_values.get(
'sw-version', True) or True
return True
return False
finally: