AutoPEP8 all the things

This commit is contained in:
László Károlyi 2018-04-22 03:32:05 +02:00
parent 900e94cb30
commit 07f675f5d3
Signed by: karolyi
GPG key ID: 2DCAF25E55735BFE
15 changed files with 425 additions and 368 deletions

View file

@ -4,185 +4,185 @@
"""
INTERMEDIATE_STATUS_CODES = {
0x00: 'PT is waiting for amount - confirmation',
0x01: 'please watch PIN - Pad',
0x02: 'please watch PIN - Pads',
0x03: 'not accepted',
0x04: 'PT is waiting for response from FEP',
0x05: 'PT is sending auto - reversal',
0x06: 'PT is sending post - bookings',
0x07: 'card not admitted',
0x08: 'card unknown / undefined',
0x09: 'expired card',
0x0A: 'insert card',
0x0B: 'please remove card !',
0x0C: 'card not readable',
0x0D: 'processing error',
0x0E: 'please wait...',
0x0F: 'PT is commencing an automatic end - of - day batch',
0x10: 'invalid card',
0x11: 'balance display',
0x12: 'system malfunction',
0x13: 'payment not possible',
0x14: 'credit not sufficient',
0x15: 'incorrect PIN',
0x16: 'limit not sufficient',
0x17: 'please wait...',
0x18: 'PIN try limit exceeded',
0x19: 'card - data incorrect',
0x1A: 'service - mode',
0x1B: 'approved. please fill - up',
0x1C: 'approved. please take goods',
0x1D: 'declined',
0x26: 'PT is waiting for input of the mobile - number',
0x27: 'PT is waiting for repeat of mobile number',
0x41: 'please watch PIN - Pad please remove card !',
0x42: 'please watch PIN - Pad please remove card !',
0x43: 'not accepted, please remove card !',
0x44: 'PT is waiting for response from FEP please remove card !',
0x45: 'PT is sending auto - reversal please remove card !',
0x46: 'PT is sending post - booking please remove card !',
0x47: 'card not admitted please remove card !',
0x48: 'card unknown / undefined please remove card !',
0x49: 'expired card please remove card !',
0x4A: '',
0x4B: 'please remove card !',
0x4C: 'card not readable please remove card !',
0x4D: 'processing error please remove card !',
0x4E: 'please wait... please remove card !',
0x4F: 'PT is commencing an automatic end - of - day batch please remove card !',
0x50: 'invalid card please remove card !',
0x51: 'balance display please remove card !',
0x52: 'system malfunction please remove card !',
0x53: 'payment not possible please remove card !',
0x54: 'credit not sufficient please remove card !',
0x55: 'incorrect PIN please remove card !',
0x56: 'limit not sufficient please remove card !',
0x57: 'please wait... please remove card !',
0x58: 'PIN try limit exceeded please remove card !',
0x59: 'card - data incorrect please remove card !',
0x5A: 'service - mode please remove card !',
0x5B: 'approved. please fill - up please remove card !',
0x5C: 'approved. please take goods please remove card !',
0x5D: 'declined please remove card !',
0x66: 'PT is waiting for input of the mobil - number please remove card !',
0x67: 'PT is waiting for repeat of the mobil - number please remove card !',
0xC7: 'PT is waiting for input of the mileage',
0xC8: 'PT is waiting for cashier',
0xC9: 'PT is commencing an automatic diagnosis',
0xCA: 'PT is commencing an automatic initialisation',
0xCB: 'merchant - journal full',
0xCC: 'debit advice not possible, PIN required',
0xD2: 'connecting dial - up',
0xD3: 'dial - up connection made',
0xE0: 'PT is waiting for application - selection',
0xE1: 'PT is waiting for language - selection',
0xF1: 'offline',
0xF2: 'online',
0xF3: 'offline transaction',
0xFF: 'custom or unknown status.',
0x00: 'PT is waiting for amount - confirmation',
0x01: 'please watch PIN - Pad',
0x02: 'please watch PIN - Pads',
0x03: 'not accepted',
0x04: 'PT is waiting for response from FEP',
0x05: 'PT is sending auto - reversal',
0x06: 'PT is sending post - bookings',
0x07: 'card not admitted',
0x08: 'card unknown / undefined',
0x09: 'expired card',
0x0A: 'insert card',
0x0B: 'please remove card !',
0x0C: 'card not readable',
0x0D: 'processing error',
0x0E: 'please wait...',
0x0F: 'PT is commencing an automatic end - of - day batch',
0x10: 'invalid card',
0x11: 'balance display',
0x12: 'system malfunction',
0x13: 'payment not possible',
0x14: 'credit not sufficient',
0x15: 'incorrect PIN',
0x16: 'limit not sufficient',
0x17: 'please wait...',
0x18: 'PIN try limit exceeded',
0x19: 'card - data incorrect',
0x1A: 'service - mode',
0x1B: 'approved. please fill - up',
0x1C: 'approved. please take goods',
0x1D: 'declined',
0x26: 'PT is waiting for input of the mobile - number',
0x27: 'PT is waiting for repeat of mobile number',
0x41: 'please watch PIN - Pad please remove card !',
0x42: 'please watch PIN - Pad please remove card !',
0x43: 'not accepted, please remove card !',
0x44: 'PT is waiting for response from FEP please remove card !',
0x45: 'PT is sending auto - reversal please remove card !',
0x46: 'PT is sending post - booking please remove card !',
0x47: 'card not admitted please remove card !',
0x48: 'card unknown / undefined please remove card !',
0x49: 'expired card please remove card !',
0x4A: '',
0x4B: 'please remove card !',
0x4C: 'card not readable please remove card !',
0x4D: 'processing error please remove card !',
0x4E: 'please wait... please remove card !',
0x4F: 'PT is commencing an automatic end - of - day batch please remove card !',
0x50: 'invalid card please remove card !',
0x51: 'balance display please remove card !',
0x52: 'system malfunction please remove card !',
0x53: 'payment not possible please remove card !',
0x54: 'credit not sufficient please remove card !',
0x55: 'incorrect PIN please remove card !',
0x56: 'limit not sufficient please remove card !',
0x57: 'please wait... please remove card !',
0x58: 'PIN try limit exceeded please remove card !',
0x59: 'card - data incorrect please remove card !',
0x5A: 'service - mode please remove card !',
0x5B: 'approved. please fill - up please remove card !',
0x5C: 'approved. please take goods please remove card !',
0x5D: 'declined please remove card !',
0x66: 'PT is waiting for input of the mobil - number please remove card !',
0x67: 'PT is waiting for repeat of the mobil - number please remove card !',
0xC7: 'PT is waiting for input of the mileage',
0xC8: 'PT is waiting for cashier',
0xC9: 'PT is commencing an automatic diagnosis',
0xCA: 'PT is commencing an automatic initialisation',
0xCB: 'merchant - journal full',
0xCC: 'debit advice not possible, PIN required',
0xD2: 'connecting dial - up',
0xD3: 'dial - up connection made',
0xE0: 'PT is waiting for application - selection',
0xE1: 'PT is waiting for language - selection',
0xF1: 'offline',
0xF2: 'online',
0xF3: 'offline transaction',
0xFF: 'custom or unknown status.',
}
ERRORCODES = {
# ERRORCODES PAGE: 165
# SUBSTITUTED: ^[A-Fa-f\d]{2,2}
# TO: 0x\0: "
# AND: .*$ TO: \0",
0x00 : "00 no error",
# 01-63 01 99 errorcodes from network-operator system/authorisation-system",
0x64: "card not readable (LRC-/parity-error)",
0x65: "card-data not present (neither track-data nor chip found)",
0x66: "processing-error (also for problems with card-reader mechanism)",
0x67: "function not permitted for ec- and Maestro-cards",
0x68: "function not permitted for credit- and tank-cards",
0x6A: "turnover-file full",
0x6B: "function deactivated (PT not registered)",
0x6C: "abort via time-out or abort-key ",
0x6E: "card in blocked-list (response to command 06 E4)",
0x6F: "wrong currency",
0x71: "credit not sufficient (chip-card)",
0x72: "chip error ",
0x73: "card-data incorrect (e.g. country-key check, checksum-error)",
0x77: "end-of-day batch not possible ",
0x78: "card expired",
0x79: "card not yet valid",
0x7A: "card unknown",
0x7D: "communication error (communication module does not answer or is not present)",
0x83: "function not possible",
0x85: "key missing",
0x89: "PIN-pad defective",
0x9A: "trnasferprotocol- error",
0x9B: "error from dial-up/communication fault",
0x9C: "please wait",
0xA0: "receiver not ready",
0xA1: "remote station does not respond",
0xA3: "no connection",
0xA4: "submission of Geldkarte not possible",
0xB1: "memory full",
0xB2: "merchant-journal full",
0xB4: "already reversed",
0xB5: "reversal not possible",
0xB7: "pre-authorisation incorrect (amount too high) or amount wrong",
0xB8: "error pre-authorisation",
0xBF: "voltage supply to low (external power supply)",
0xC0: "card locking mechanism defective",
0xC1: "merchant-card locked ",
0xC2: "diagnosis required",
0xC3: "maximum amount exceeded",
0xC4: "card-profile invalid. New card-profiles must be loaded.",
0xC5: "payment method not supported",
# PAGE 166
0xC6 : "currency not applicable",
0xC8 : "amount zu small",
0xC9 : "max. transaction-amount zu small",
0xCB : "function only allowed in EURO",
0xCC : "printer not ready",
0xD2 : "function not permitted for service-cards/bank-customer-cards",
0xDC : "card inserted",
0xDD : "error during card-eject (for motor-insertion reader)",
0xDE : "error during card-insertion (for motor-insertion reader)",
0xE0 : "remote-maintenance activated",
0xE2 : "card-reader does not answer / card-reader defective",
0xE3 : "shutter closed",
0xE7 : "min. one goods-group not found",
0xE8 : "no goods-groups-table loaded",
0xE9 : "restriction-code not permitted",
0xEA : "card-code not permitted (e.g. card not activated via Diagnosis)",
0xEB : "function not executable (PIN-algorithm unknown)",
0xEC : "PIN-processing not possible",
0xED : "PIN-pad defective",
0xF0 : "open end-of-day batch present",
0xF1 : "ec-cash/Maestro offline error",
0xF5 : "OPT-error",
0xF6 : "OPT-data not available (= OPT personalisation required)",
0xFA : "error transmitting offline-transactions (clearing error)",
0xFB : "turnover data-set defective",
0xFC : "necessary device not present or defective",
0xFD : "baudrate not supported",
0xFE : "register unknown",
0xFF : "system error" # (= other/unknown error), See TLV tags 1F16 and 1F17
# ERRORCODES PAGE: 165
# SUBSTITUTED: ^[A-Fa-f\d]{2,2}
# TO: 0x\0: "
# AND: .*$ TO: \0",
0x00: "00 no error",
# 01-63 01 99 errorcodes from network-operator system/authorisation-system",
0x64: "card not readable (LRC-/parity-error)",
0x65: "card-data not present (neither track-data nor chip found)",
0x66: "processing-error (also for problems with card-reader mechanism)",
0x67: "function not permitted for ec- and Maestro-cards",
0x68: "function not permitted for credit- and tank-cards",
0x6A: "turnover-file full",
0x6B: "function deactivated (PT not registered)",
0x6C: "abort via time-out or abort-key ",
0x6E: "card in blocked-list (response to command 06 E4)",
0x6F: "wrong currency",
0x71: "credit not sufficient (chip-card)",
0x72: "chip error ",
0x73: "card-data incorrect (e.g. country-key check, checksum-error)",
0x77: "end-of-day batch not possible ",
0x78: "card expired",
0x79: "card not yet valid",
0x7A: "card unknown",
0x7D: "communication error (communication module does not answer or is not present)",
0x83: "function not possible",
0x85: "key missing",
0x89: "PIN-pad defective",
0x9A: "trnasferprotocol- error",
0x9B: "error from dial-up/communication fault",
0x9C: "please wait",
0xA0: "receiver not ready",
0xA1: "remote station does not respond",
0xA3: "no connection",
0xA4: "submission of Geldkarte not possible",
0xB1: "memory full",
0xB2: "merchant-journal full",
0xB4: "already reversed",
0xB5: "reversal not possible",
0xB7: "pre-authorisation incorrect (amount too high) or amount wrong",
0xB8: "error pre-authorisation",
0xBF: "voltage supply to low (external power supply)",
0xC0: "card locking mechanism defective",
0xC1: "merchant-card locked ",
0xC2: "diagnosis required",
0xC3: "maximum amount exceeded",
0xC4: "card-profile invalid. New card-profiles must be loaded.",
0xC5: "payment method not supported",
# PAGE 166
0xC6: "currency not applicable",
0xC8: "amount zu small",
0xC9: "max. transaction-amount zu small",
0xCB: "function only allowed in EURO",
0xCC: "printer not ready",
0xD2: "function not permitted for service-cards/bank-customer-cards",
0xDC: "card inserted",
0xDD: "error during card-eject (for motor-insertion reader)",
0xDE: "error during card-insertion (for motor-insertion reader)",
0xE0: "remote-maintenance activated",
0xE2: "card-reader does not answer / card-reader defective",
0xE3: "shutter closed",
0xE7: "min. one goods-group not found",
0xE8: "no goods-groups-table loaded",
0xE9: "restriction-code not permitted",
0xEA: "card-code not permitted (e.g. card not activated via Diagnosis)",
0xEB: "function not executable (PIN-algorithm unknown)",
0xEC: "PIN-processing not possible",
0xED: "PIN-pad defective",
0xF0: "open end-of-day batch present",
0xF1: "ec-cash/Maestro offline error",
0xF5: "OPT-error",
0xF6: "OPT-data not available (= OPT personalisation required)",
0xFA: "error transmitting offline-transactions (clearing error)",
0xFB: "turnover data-set defective",
0xFC: "necessary device not present or defective",
0xFD: "baudrate not supported",
0xFE: "register unknown",
0xFF: "system error" # (= other/unknown error), See TLV tags 1F16 and 1F17
}
TERMINAL_STATUS_CODES = {
0x00 : "PT ready",
0x51 : "initialisation required",
0x62 : "date/time incorrect",
0x9C : "please wait (e.g. software-update still running)",
0xB1 : "memory full",
0xB2 : "merchant-journal full",
0xBF : "voltage supply too low (external power supply)",
0xC0 : "card locking mechanism defect",
0xC1 : "merchant card locked",
0xC2 : "diagnosis required",
0xC4 : "card-profile invalid. New card-profiles must be loaded",
0xCC : "printer not ready",
0xDC : "card inserted",
0xDF : "out-of-order",
0xE0 : "remote-maintenance activated",
0xE1 : "card not completely removed",
0xE2 : "card-reader doe not answer / card-reader defective",
0xE3 : "shutter closed",
0xF6 : "OPT-data not availble (= OPT-Personalisation required)" }
0x00: "PT ready",
0x51: "initialisation required",
0x62: "date/time incorrect",
0x9C: "please wait (e.g. software-update still running)",
0xB1: "memory full",
0xB2: "merchant-journal full",
0xBF: "voltage supply too low (external power supply)",
0xC0: "card locking mechanism defect",
0xC1: "merchant card locked",
0xC2: "diagnosis required",
0xC4: "card-profile invalid. New card-profiles must be loaded",
0xCC: "printer not ready",
0xDC: "card inserted",
0xDF: "out-of-order",
0xE0: "remote-maintenance activated",
0xE1: "card not completely removed",
0xE2: "card-reader doe not answer / card-reader defective",
0xE3: "shutter closed",
0xF6: "OPT-data not availble (= OPT-Personalisation required)"}
"""
DEBUG_PACKET_NAME = {
@ -268,13 +268,16 @@ DEBUG_PACKET_NAME = {
}
"""
class Logling(object):
"""
a simple log interface
"""
def log(self, *args, **kwargs):
print(" ".join(args))
class Dumpling(object):
"""
Interface, which defines that this object can
@ -282,6 +285,7 @@ class Dumpling(object):
- tell you how much bytes of data it has.
- dumpling does not solve how you store your data.
"""
def dump(self):
"""
returns a list of bytes, representing this dumpling in the stream.
@ -295,31 +299,38 @@ class Dumpling(object):
"""
return len(self.dump())
class Transport(Logling):
def connect(self, *args, **kwargs):
"""
connect to transport.
"""
pass
def receive(self, timeout=None, *args, **kwargs):
"""
receive data.
"""
pass
def send(self, message, *args, **kwargs):
"""
send data.
"""
pass
class ZVTException(Exception):
pass
class TransportLayerException(ZVTException):
pass
class TransportTimeoutException(TransportLayerException):
pass
class ApplicationLayerException(ZVTException):
pass

View file

@ -195,7 +195,7 @@ def toHexString(bytes=None, format=0):
try:
from string import rstrip
except ImportError: # Python3
except ImportError: # Python3
def rstrip(s, chars=None):
return s.rstrip(chars)

View file

@ -9,7 +9,6 @@ if sys.version_info[0] == 2:
range = xrange
def build_codetable(poly):
"""
builds an ascii codetable for a polynome to be used by a crc checksum
@ -25,6 +24,7 @@ def build_codetable(poly):
crc_table += [crc]
return crc_table
def crc_checksum(something, poly=33800):
"""
makes a crc checksum with any given polynome, building the table
@ -38,9 +38,10 @@ def crc_checksum(something, poly=33800):
crc = crc_table[lb ^ ord(i)] ^ hb
return crc
#: poly = 0x8408
TABLE_XMODEM16 = [
0, 4489, 8978, 12955, 17956, 22445, 25910, 29887, 35912,
0, 4489, 8978, 12955, 17956, 22445, 25910, 29887, 35912,
40385, 44890, 48851, 51820, 56293, 59774, 63735, 4225, 264,
13203, 8730, 22181, 18220, 30135, 25662, 40137, 36160, 49115,
44626, 56045, 52068, 63999, 59510, 8450, 12427, 528, 5017,
@ -53,13 +54,13 @@ TABLE_XMODEM16 = [
64991, 60502, 39145, 35168, 48123, 43634, 25350, 29327, 16404,
20893, 9506, 13483, 1584, 6073, 61262, 65223, 52316, 56789,
43370, 47331, 35448, 39921, 29575, 25102, 20629, 16668, 13731,
9258, 5809, 1848, 65487, 60998, 56541, 52564, 47595, 43106,
9258, 5809, 1848, 65487, 60998, 56541, 52564, 47595, 43106,
39673, 35696, 33800, 38273, 42778, 46739, 49708, 54181, 57662,
61623, 2112, 6601, 11090, 15067, 20068, 24557, 28022, 31999,
38025, 34048, 47003, 42514, 53933, 49956, 61887, 57398, 6337,
2376, 15315, 10842, 24293, 20332, 32247, 27774, 42250, 46211,
2376, 15315, 10842, 24293, 20332, 32247, 27774, 42250, 46211,
34328, 38801, 58158, 62119, 49212, 53685, 10562, 14539, 2640,
7129, 28518, 32495, 19572, 24061, 46475, 41986, 38553, 34576,
7129, 28518, 32495, 19572, 24061, 46475, 41986, 38553, 34576,
62383, 57894, 53437, 49460, 14787, 10314, 6865, 2904, 32743,
28270, 23797, 19836, 50700, 55173, 58654, 62615, 32808, 37281,
41786, 45747, 19012, 23501, 26966, 30943, 3168, 7657, 12146,
@ -68,7 +69,7 @@ TABLE_XMODEM16 = [
63111, 50204, 54677, 41258, 45219, 33336, 37809, 27462, 31439,
18516, 23005, 11618, 15595, 3696, 8185, 63375, 58886, 54429,
50452, 45483, 40994, 37561, 33584, 31687, 27214, 22741, 18780,
15843, 11370, 7921, 3960 ]
15843, 11370, 7921, 3960]
def crc_xmodem16(something):

View file

@ -21,8 +21,11 @@ from ecrterm.utils import is_stringlike
class A(object):
def write(self, *args, **kwargs):
pass
_logfile = A()
def dismantle_serial_packet(data):
apdu = []
crc = None
@ -38,7 +41,7 @@ def dismantle_serial_packet(data):
# read until DLE, ETX is reached.
dle = False
while not crc and i < len(data):
b = data[i] # read a byte.
b = data[i] # read a byte.
if b == ETX and dle:
# dle was set, and this is ETX, so we are at the end.
# we read the CRC now.
@ -62,10 +65,11 @@ def dismantle_serial_packet(data):
i += 1
return crc, apdu
def parse_represented_data(data):
# represented data
if is_stringlike(data):
# we assume a bytelist like 10 02 03....
# we assume a bytelist like 10 02 03....
data = conv.toBytes(data)
# first of all, serial data starts with 10 02, so everything
# starting with 10 will be assumed as "serial packet" and first "demantled"
@ -84,6 +88,7 @@ def parse_represented_data(data):
p = Packet.parse(data)
return p
def ecr_log(data, incoming=False):
try:
if incoming:
@ -111,6 +116,7 @@ def ecr_log(data, incoming=False):
traceback.print_exc()
print("| error in log")
class ECR(object):
transmitter = None
transport = None
@ -130,7 +136,7 @@ class ECR(object):
"""
Initializes an ECR object and connects to the serial device given
Fails if Serial Device is not found.
You can access the Device on low level as the `transport`
You can access the Protocol Handler on low level as `transmission`
"""
@ -163,32 +169,31 @@ class ECR(object):
kwargs['config_byte'] = config_byte
ret = self.transmit(Registration(**kwargs))
if ret == TRANSMIT_OK:
# get the terminal-id if its there.
for inc, packet in self.transmitter.last_history:
if inc and isinstance(packet, Completion):
if 'tid' in packet.bitmaps_as_dict().keys():
self.terminal_id = packet.bitmaps_as_dict()\
.get('tid', BCD(0)).value()
.get('tid', BCD(0)).value()
# remember this.
self._state_registered = True
return ret
def register_unlocked(self):
"""
registers to the PT, not locking the master menu on it.
do not use in production environment.
"""
ret = self.transmit(
Registration(password=self.password,
config_byte=Registration.generate_config(
ecr_controls_admin=False),))
Registration(password=self.password,
config_byte=Registration.generate_config(
ecr_controls_admin=False),))
if ret == TRANSMIT_OK:
self._state_registered = True
return ret
def _end_of_day_info_packet(self, history=None):
'''
search for an end of day packet status information in the last packets
@ -198,7 +203,7 @@ class ECR(object):
status_info = None
plist = history or self.transmitter.last_history
for inc, packet in plist:
if inc: # incoming
if inc: # incoming
if isinstance(packet, StatusInformation):
status_info = packet
if status_info:
@ -207,12 +212,11 @@ class ECR(object):
eod_info['terminal-id'] = self.terminal_id
return eod_info
def end_of_day(self):
"""
- sends an end of day packet.
- saves the log in `daylog`
@returns: 0 if there were no protocol errors.
"""
#old_histoire = self.transmitter.history
@ -221,7 +225,7 @@ class ECR(object):
result = self.transmit(EndOfDay(self.password))
# now save the log
self.daylog = self.last_printout()
if not self.daylog:
# there seems to be no printout. we search in statusinformation.
eod_info = self._end_of_day_info_packet()
@ -243,7 +247,7 @@ class ECR(object):
inc, packet = entry
#old_histoire += [(inc, packet)]
if inc and isinstance(packet, PrintLine):
printout += [ packet.fixed_values['text'] ]
printout += [packet.fixed_values['text']]
return printout
def payment(self, amount_cent=50):
@ -254,9 +258,9 @@ class ECR(object):
throws exceptions.
"""
pkg = Authorisation(
amount=amount_cent, # in cents.
currency_code=978, #euro, only one that works, can be skipped.
)
amount=amount_cent, # in cents.
currency_code=978, # euro, only one that works, can be skipped.
)
code = self.transmit(pkg)
if code == 0:
@ -295,11 +299,11 @@ class ECR(object):
beeps=0):
"""
displays a text on the PT screen for duration of seconds.
@param lines: a list of strings.
@param duration: 0 for forever.
@param beeps: make some noise.
@note: any error due to wrong strings given are not checked.
"""
lines = lines or ['Hello world!', ]
@ -321,7 +325,7 @@ class ECR(object):
errors:
returns None if no status was transmitted.
returns False on transmit errors.
to check for the status code:
common.TERMINAL_STATUS_CODES.get( status, 'Unknown' )
"""
@ -330,18 +334,18 @@ class ECR(object):
if isinstance(self.last.completion, Completion):
# try to get version
if not self.version:
self.version = self.last.completion.fixed_values.get('sw-version', None)
self.version = self.last.completion.fixed_values.get(
'sw-version', None)
return self.last.completion.fixed_values.get('terminal-status', None)
# no completion means some error.
return False
def transmit(self, packet):
"""
transmits a packet, therefore introducing the protocol cascade.
rewrite this function if you want packets be routed anywhere
since the whole ECR Object uses this function to transmit.
use `last` property to access last packet transmitted.
"""
# we actually make a small sleep, allowing better flow.
@ -406,11 +410,12 @@ class ECR(object):
def parse_str(self, s):
return parse_represented_data(s)
if __name__ == '__main__':
_logfile = open('./terminallog.txt', 'aw')
_logfile.write('-MARK-\n')
e = ECR()
#e.end_of_day()
# e.end_of_day()
e.show_text(['Hello world!', 'Testing', 'myself.'], 5, 0)
print("preparing for payment.")
e.get_ready()

View file

@ -13,74 +13,74 @@
from ecrterm.packets.bmp import BMP
BITMAPS = {
0x01 : (BMP.FormatByte(1), 'timeout', "binary time-out"),
0x02 : (BMP.FormatByte(1), 'max_status_infos', "binary max.status infos"),
0x03 : (BMP.FormatByte(1), 'service_byte', "binary service-byte"),
0x04 : (BMP.FormatBCDByte(6), 'amount', "Amount"),
0x05 : (BMP.FormatByte(1), 'pump_nr', "binary pump-Nr."),
0x06 : (BMP.FormatTLV(), 'tlv', "TLV"),
0x0B : (BMP.FormatBCDByte(3), 'trace_number', "trace-number"),
0x0C : (BMP.FormatBCDByte(3), 'time', "Time"),
0x0D : (BMP.FormatBCDByte(2), 'date_day', "date, MM DD (see AA)"),
0x0E : (BMP.FormatBCDByte(2), 'card_expire', "expiry-date, YY MM"),
0x17 : (BMP.FormatBCDByte(2), 'card_sequence_number', "card sequence-number"),
0x19 : (BMP.FormatByte(1), 'type', "binary status-byte/payment-type/card-type"),
0x22 : (BMP.FormatLLVAR(), 'card_number', "card_number, PAN / EF_ID, 'E' used to indicate masked numeric digit"),
0x23 : (BMP.FormatLLVAR(), 'track_2', "track 2 data, 'E' used to indicate masked numeric digit1"),
0x24 : (BMP.FormatLLLVAR(), 'track_3', "track 3 data, 'E' used to indicate masked numeric digit1"),
0x27 : (BMP.FormatByte(1), 'result_code', "binary result-code"),
0x29 : (BMP.FormatBCDByte(4), 'tid', "TID"),
0x2A : (BMP.FormatByte(15), 'vu', "ASCII VU-number"),
0x2D : (BMP.FormatLLVAR(), 'track_1', "track 1 data"),
0x2E : (BMP.FormatLLLVAR(), 'sync_chip_data', "sychronous chip data"),
0x37 : (BMP.FormatBCDByte(3), 'trace_number_original', "trace-number of the original transaction for reversal"),
0x01: (BMP.FormatByte(1), 'timeout', "binary time-out"),
0x02: (BMP.FormatByte(1), 'max_status_infos', "binary max.status infos"),
0x03: (BMP.FormatByte(1), 'service_byte', "binary service-byte"),
0x04: (BMP.FormatBCDByte(6), 'amount', "Amount"),
0x05: (BMP.FormatByte(1), 'pump_nr', "binary pump-Nr."),
0x06: (BMP.FormatTLV(), 'tlv', "TLV"),
0x0B: (BMP.FormatBCDByte(3), 'trace_number', "trace-number"),
0x0C: (BMP.FormatBCDByte(3), 'time', "Time"),
0x0D: (BMP.FormatBCDByte(2), 'date_day', "date, MM DD (see AA)"),
0x0E: (BMP.FormatBCDByte(2), 'card_expire', "expiry-date, YY MM"),
0x17: (BMP.FormatBCDByte(2), 'card_sequence_number', "card sequence-number"),
0x19: (BMP.FormatByte(1), 'type', "binary status-byte/payment-type/card-type"),
0x22: (BMP.FormatLLVAR(), 'card_number', "card_number, PAN / EF_ID, 'E' used to indicate masked numeric digit"),
0x23: (BMP.FormatLLVAR(), 'track_2', "track 2 data, 'E' used to indicate masked numeric digit1"),
0x24: (BMP.FormatLLLVAR(), 'track_3', "track 3 data, 'E' used to indicate masked numeric digit1"),
0x27: (BMP.FormatByte(1), 'result_code', "binary result-code"),
0x29: (BMP.FormatBCDByte(4), 'tid', "TID"),
0x2A: (BMP.FormatByte(15), 'vu', "ASCII VU-number"),
0x2D: (BMP.FormatLLVAR(), 'track_1', "track 1 data"),
0x2E: (BMP.FormatLLLVAR(), 'sync_chip_data', "sychronous chip data"),
0x37: (BMP.FormatBCDByte(3), 'trace_number_original', "trace-number of the original transaction for reversal"),
0x3A: (BMP.FormatBCDByte(2), 'cvv', 'the field cvv is optionally used for mail order'),
0x3B : (BMP.FormatByte(8), 'aid', "AID authorisation-attribute"),
0x3C : (BMP.FormatLLLVAR(), 'additional', "additional-data/additional-text"),
0x3D : (BMP.FormatBCDByte(3), 'password', "Password"),
0x49 : (BMP.FormatBCDByte(2), 'currency_code', "currency code"),
0x60 : (BMP.FormatLLLVAR(), 'totals', "individual totals"),
0x87 : (BMP.FormatBCDByte(2), 'receipt', "receipt-number"),
0x88 : (BMP.FormatBCDByte(3), 'turnover', "turnover record number"),
0x8A : (BMP.FormatByte(1), 'card_type', "binary card-type (card-number according to ZVT-protocol; comparison 8C)"),
0x8B : (BMP.FormatLLVAR(), 'card_name', "card-name"),
0x8C : (BMP.FormatByte(1), 'card_operator', "binary card-type-ID of the network operator (comparison 8A)"),
0x92 : (BMP.FormatLLLVAR(), 'offline_chip', "additional-data ec-Cash with chip offline"),
0x9A : (BMP.FormatLLLVAR(), 'geldkarte', "Geldkarte payments-/ failed-payment record/total record Geldkarte"),
0xA0 : (BMP.FormatByte(1), 'result_code_as', "binary result-code-AS"),
0xA7 : (BMP.FormatLLVAR(), 'chip_ef_id', "chip-data, EF_ID"),
0xAA : (BMP.FormatBCDByte(3), 'date', "date YY MM DD (see 0D)"),
0xAF : (BMP.FormatLLLVAR(), 'ef_info', "EF_Info"),
0xBA : (BMP.FormatByte(5), 'aid_param', "binary AID-parameter"),
0xD0 : (BMP.FormatByte(1), 'algo_key', "binary algorithm-Key"),
0xD1 : (BMP.FormatLLVAR(), 'offset', "card offset/PIN-data"),
0xD2 : (BMP.FormatByte(1), 'direction', "binary direction"),
0xD3 : (BMP.FormatByte(1), 'key_position', "binary key-position"),
0xE0 : (BMP.FormatByte(1), 'input_min', "binary min. length of the input"),
0xE1 : (BMP.FormatLLVAR(), 'iline1', "text2 line 1"),
0xE2 : (BMP.FormatLLVAR(), 'iline2', "text2 line 2"),
0xE3 : (BMP.FormatLLVAR(), 'iline3', "text2 line 3"),
0xE4 : (BMP.FormatLLVAR(), 'iline4', "text2 line 4"),
0xE5 : (BMP.FormatLLVAR(), 'iline5', "text2 line 5"),
0xE6 : (BMP.FormatLLVAR(), 'iline6', "text2 line 6"),
0xE7 : (BMP.FormatLLVAR(), 'iline7', "text2 line 7"),
0xE8 : (BMP.FormatLLVAR(), 'iline8', "text2 line 8"),
0xE9 : (BMP.FormatByte(1), 'max_input_length', "binary max. length of the input"),
0xEA : (BMP.FormatByte(1), 'input_echo', "binary echo the Input"),
0xEB : (BMP.FormatByte(8), 'mac', "binary MAC over text 1 and text 2"),
0xF0 : (BMP.FormatByte(1), 'display_duration', "binary display-duration"),
0xF1 : (BMP.FormatLLVAR(), 'line1', "text1 line 1"),
0xF2 : (BMP.FormatLLVAR(), 'line2', "text1 line 2"),
0xF3 : (BMP.FormatLLVAR(), 'line3', "text1 line 3"),
0xF4 : (BMP.FormatLLVAR(), 'line4', "text1 line 4"),
0xF5 : (BMP.FormatLLVAR(), 'line5', "text1 line 5"),
0xF6 : (BMP.FormatLLVAR(), 'line6', "text1 line 6"),
0xF7 : (BMP.FormatLLVAR(), 'line7', "text1 line 7"),
0xF8 : (BMP.FormatLLVAR(), 'line8', "text1 line 8"),
0xF9 : (BMP.FormatByte(1), 'beeps', "binary number of beep-tones"),
0xFA : (BMP.FormatByte(1), 'status', "binary status"),
0xFB : (BMP.FormatByte(1), 'ok_required', "binary confirmation the input with <OK> required"),
0xFC : (BMP.FormatByte(1), 'dialog_control', "binary dialog-control"),
0x3B: (BMP.FormatByte(8), 'aid', "AID authorisation-attribute"),
0x3C: (BMP.FormatLLLVAR(), 'additional', "additional-data/additional-text"),
0x3D: (BMP.FormatBCDByte(3), 'password', "Password"),
0x49: (BMP.FormatBCDByte(2), 'currency_code', "currency code"),
0x60: (BMP.FormatLLLVAR(), 'totals', "individual totals"),
0x87: (BMP.FormatBCDByte(2), 'receipt', "receipt-number"),
0x88: (BMP.FormatBCDByte(3), 'turnover', "turnover record number"),
0x8A: (BMP.FormatByte(1), 'card_type', "binary card-type (card-number according to ZVT-protocol; comparison 8C)"),
0x8B: (BMP.FormatLLVAR(), 'card_name', "card-name"),
0x8C: (BMP.FormatByte(1), 'card_operator', "binary card-type-ID of the network operator (comparison 8A)"),
0x92: (BMP.FormatLLLVAR(), 'offline_chip', "additional-data ec-Cash with chip offline"),
0x9A: (BMP.FormatLLLVAR(), 'geldkarte', "Geldkarte payments-/ failed-payment record/total record Geldkarte"),
0xA0: (BMP.FormatByte(1), 'result_code_as', "binary result-code-AS"),
0xA7: (BMP.FormatLLVAR(), 'chip_ef_id', "chip-data, EF_ID"),
0xAA: (BMP.FormatBCDByte(3), 'date', "date YY MM DD (see 0D)"),
0xAF: (BMP.FormatLLLVAR(), 'ef_info', "EF_Info"),
0xBA: (BMP.FormatByte(5), 'aid_param', "binary AID-parameter"),
0xD0: (BMP.FormatByte(1), 'algo_key', "binary algorithm-Key"),
0xD1: (BMP.FormatLLVAR(), 'offset', "card offset/PIN-data"),
0xD2: (BMP.FormatByte(1), 'direction', "binary direction"),
0xD3: (BMP.FormatByte(1), 'key_position', "binary key-position"),
0xE0: (BMP.FormatByte(1), 'input_min', "binary min. length of the input"),
0xE1: (BMP.FormatLLVAR(), 'iline1', "text2 line 1"),
0xE2: (BMP.FormatLLVAR(), 'iline2', "text2 line 2"),
0xE3: (BMP.FormatLLVAR(), 'iline3', "text2 line 3"),
0xE4: (BMP.FormatLLVAR(), 'iline4', "text2 line 4"),
0xE5: (BMP.FormatLLVAR(), 'iline5', "text2 line 5"),
0xE6: (BMP.FormatLLVAR(), 'iline6', "text2 line 6"),
0xE7: (BMP.FormatLLVAR(), 'iline7', "text2 line 7"),
0xE8: (BMP.FormatLLVAR(), 'iline8', "text2 line 8"),
0xE9: (BMP.FormatByte(1), 'max_input_length', "binary max. length of the input"),
0xEA: (BMP.FormatByte(1), 'input_echo', "binary echo the Input"),
0xEB: (BMP.FormatByte(8), 'mac', "binary MAC over text 1 and text 2"),
0xF0: (BMP.FormatByte(1), 'display_duration', "binary display-duration"),
0xF1: (BMP.FormatLLVAR(), 'line1', "text1 line 1"),
0xF2: (BMP.FormatLLVAR(), 'line2', "text1 line 2"),
0xF3: (BMP.FormatLLVAR(), 'line3', "text1 line 3"),
0xF4: (BMP.FormatLLVAR(), 'line4', "text1 line 4"),
0xF5: (BMP.FormatLLVAR(), 'line5', "text1 line 5"),
0xF6: (BMP.FormatLLVAR(), 'line6', "text1 line 6"),
0xF7: (BMP.FormatLLVAR(), 'line7', "text1 line 7"),
0xF8: (BMP.FormatLLVAR(), 'line8', "text1 line 8"),
0xF9: (BMP.FormatByte(1), 'beeps', "binary number of beep-tones"),
0xFA: (BMP.FormatByte(1), 'status', "binary status"),
0xFB: (BMP.FormatByte(1), 'ok_required', "binary confirmation the input with <OK> required"),
0xFC: (BMP.FormatByte(1), 'dialog_control', "binary dialog-control"),
}
BITMAPS_ARGS = {}

View file

@ -15,10 +15,12 @@ from ecrterm.utils import is_stringlike
if sys.version_info[0] == 2:
range = xrange
def int_word_split(x, endian='>'): # default big endian.
def int_word_split(x, endian='>'): # default big endian.
""" splits 2byte integer (sometimes called a word) into 2 byte list"""
return conv.bs2hl(struct.pack('%sH' % endian, x & 0xFFFF))
class BMPFactory(Dumpling):
@classmethod
def FormatByte(cls, length=1):
@ -62,14 +64,17 @@ class BMPFactory(Dumpling):
bmp._key = bmp_key
rest = bmp.parse(data)
if len(rest) and (len(rest) == len(data)):
raise NotImplemented("Bitmap Class without parsing mechanism detected")
raise NotImplemented(
"Bitmap Class without parsing mechanism detected")
return bmp, rest
class BMP(BMPFactory):
_id = 0x0
_data = None
_descr = ""
_key = ''
def get_id(self):
return self._id or 0x0
id = property(get_id)
@ -84,7 +89,7 @@ class BMP(BMPFactory):
elif isinstance(data, list):
self._data = data
else:
self._data = [ data ]
self._data = [data]
self._rangecheck()
def value(self):
@ -122,7 +127,7 @@ class BMP(BMPFactory):
>>> [ hex(i) for i in BMP.encode_fcd( 1234 ) ]
['0xf1', '0xf2', '0xf3', '0xf4']
"""
return [ factor + int(i) for i in list(str(int(x)))]
return [factor + int(i) for i in list(str(int(x)))]
@classmethod
def decode_fcd(cls, number_list, factor=0xf0):
@ -138,6 +143,7 @@ class BMP(BMPFactory):
break
return ret
class LVAR(BMP):
"""
LVAR Abstract Class
@ -145,8 +151,8 @@ class LVAR(BMP):
ZVT Protocol.
also implements bases for LLVar and LLLVar.
"""
_id = None # the lvar does not know its id from start.
LL = 0 #: length of length header minimum.
_id = None # the lvar does not know its id from start.
LL = 0 # : length of length header minimum.
_data = []
def __init__(self, data=None):
@ -170,25 +176,26 @@ class LVAR(BMP):
def value(self):
return conv.hl2bs(self._data)
def dump(self): # dump the bytes.
def dump(self): # dump the bytes.
"""
dumps the bytes of the LVAR as one list.
the minimum length of the length header can be set with self.LL
"""
ret = []
if self._id:
ret = [ self._id ]
lines = [ self._data, ]
ret = [self._id]
lines = [self._data, ]
for line in lines:
l = LVAR.length(len(line))
while len(l) < self.LL:
l = [ 0xF0 ] + l
l = [0xF0] + l
if is_stringlike(line):
ret += l + conv.bs2hl(line)
elif isinstance(line, list):
ret += l + line
else:
raise TypeError("Line has unsupported type in LVAR: %s" % type(line))
raise TypeError(
"Line has unsupported type in LVAR: %s" % type(line))
return ret
def parse(self, data):
@ -200,8 +207,8 @@ class LVAR(BMP):
l = BMP.decode_fcd(l)
# get the data
data = data[self.LL:]
self._data = data[:l] # conversion of any kinds ?
return data[l:] # we return the rest.
self._data = data[:l] # conversion of any kinds ?
return data[l:] # we return the rest.
@classmethod
def length(cls, length):
@ -212,6 +219,7 @@ class LVAR(BMP):
"""
return BMP.encode_fcd(length)
class LLVAR(LVAR):
"""
each LLVar Line has a length code of FxFy,
@ -219,6 +227,7 @@ class LLVAR(LVAR):
"""
LL = 2
class LLLVAR(LVAR):
"""
each LLLVar Line has a length code of FxFyFz,
@ -226,10 +235,13 @@ class LLLVAR(LVAR):
"""
LL = 3
class FixedLength(BMP):
_length = 0
def get_length(self):
return self._length
def set_length(self, length):
self._length = length
length = property(get_length, set_length)
@ -244,20 +256,22 @@ class FixedLength(BMP):
ret = []
# first encode our bitmap id.
if self._id:
ret = [ self._id ]
ret = [self._id]
if is_stringlike(self._data):
ret += [ ord(c) for c in self._data[:self.length]]
ret += [ord(c) for c in self._data[:self.length]]
else:
ret += self._data[:self.length]
return ret
# two simple classes (BCD and BYTE)
class BCD(FixedLength):
@classmethod
def as_int(cls, a_list):
''' represent a bcd list as integer '''
return int(''.join([str(a) for a in a_list]))
@classmethod
def bcd_split(cls, b):
""" splits a bcd byte into a tuple of numbers """
@ -291,13 +305,13 @@ class BCD(FixedLength):
"""
@param something: a list of numbers, all < 10
@return: a list of bytes.
Note: this function fills up numbers missing with 0,
except you tell strict to be True.
"""
if is_stringlike(something):
# you gave something like "123456"
something = [ int(x) for x in something ]
something = [int(x) for x in something]
# check the length if even
if len(something) % 2:
something = [0] + something
@ -329,7 +343,7 @@ class BCD(FixedLength):
values = self.values()
if not values:
return ''
#return ''.join(values)
# return ''.join(values)
return '%s' * len(values) % tuple(values)
def __repr__(self):
@ -342,7 +356,7 @@ class BCD(FixedLength):
ret = []
# first encode our bitmap id.
if self._id:
ret = [ self._id ]
ret = [self._id]
# now look up our length.
# our data has to be same length !
data = self._data[:]
@ -350,6 +364,7 @@ class BCD(FixedLength):
data = [00, ] + data
return ret + data
class BYTE(FixedLength):
def __repr__(self):
return "Bitmap %s, <BYTES %s>" % (self._key, self._length)

View file

@ -29,23 +29,23 @@ class TLV(BMP):
""" transforms a number into a TLV Length
returns list of bytes
"""
if length >= 0x80: # 128 or more...
if length >= 0x80: # 128 or more...
# we need more than 1 byte.
# lets see if we need only 2:
if length > 0xff: # 256 or more..
if length > 0xff: # 256 or more..
# 0x82 followed by high byte and low byte.
hb = (length & 0xFF00) >> 8
lb = length & 0xFF
return [ 0x80 + 2, hb, lb ]
return [0x80 + 2, hb, lb]
else:
return [0x80 + 1, length ]
return [0x80 + 1, length]
else:
# one byte is enough.
return [ length, ]
return [length, ]
def parse(self, data):
# just find out the length and skip that stuff
#if not data:
# if not data:
# # sometimes an empty TLV container happens.
# return []
l1 = data[0]

View file

@ -1,15 +1,16 @@
# -*- coding: utf-8 -*-
"""
Test for Data Encoding
Test for Data Encoding
All Packets tested here should be those which are SENT to the PT mainly.
you can see the incoming tests in parsing.
All Packets tested here should be those which are SENT to the PT mainly.
you can see the incoming tests in parsing.
Lets test if packets are encoded right
Lets test if packets are encoded right
Unter docs/examples finden sich dateien mit logs.
Diese Tests sehen nach ob unsere Klassen dieselben binären daten erzeugen.
Unter docs/examples finden sich dateien mit logs.
Diese Tests sehen nach ob unsere Klassen dieselben binären daten
erzeugen.
"""
import unittest
@ -22,9 +23,10 @@ from ecrterm.transmission import ACK, NAK, SerialMessage
def list_of_bytes(apdu):
sm = SerialMessage(apdu)
byte_list = sm.dump_message()
#return " ".join(["%02h" % i for i in byte_list])
# return " ".join(["%02h" % i for i in byte_list])
return conv.toHexString(byte_list)
class TestCaseDataEncoding(unittest.TestCase):
def setUp(self):
@ -41,7 +43,6 @@ class TestCaseDataEncoding(unittest.TestCase):
self.assertEqual(data_expected, list_of_bytes(pk))
#
def test_Initialisierung(self):
# Initialization Std.
data_expected = """10 02 06 93 03 12 34 56 10 03 CA A4"""
@ -89,14 +90,14 @@ class TestCaseDataEncoding(unittest.TestCase):
def test_packet_showtext(self):
data_expected = """10 02 06 E0 25 F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 10 03 5A BA"""
lines = ['FAHRERNUMMER ', 'EINGEBEN UND OK', ]
#F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 //FAHRERNUMMER
#F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B//EINGEBEN UND OK
# F1 F1 F6 46 41 48 52 45 52 4E 55 4D 4D 45 52 20 20 20 20 //FAHRERNUMMER
# F2 F1 F5 45 49 4E 47 45 42 45 4E 20 55 4E 44 20 4F 4B//EINGEBEN UND OK
pk = ShowText(
#display_duration=0,
# display_duration=0,
line1=lines[0],
#beeps=5,
# beeps=5,
line2=lines[1],
)
)
self.assertEqual(data_expected, list_of_bytes(pk))
def test_packet_statusenquiry(self):
@ -105,6 +106,5 @@ class TestCaseDataEncoding(unittest.TestCase):
self.assertEqual(data_expected, list_of_bytes(pk))
if __name__ == '__main__':
unittest.main()

View file

@ -1,8 +1,8 @@
# -*- coding: utf-8 -*-
"""
Misc. Tests.
@author g4b
Misc. Tests.
@author g4b
"""
import unittest
@ -28,8 +28,8 @@ class TestSequenceFunctions(unittest.TestCase):
# now test the full ones
password = '123456'
bcd_pass = [ 0x12, 0x34, 0x56 ]
password_nums = [ int(x) for x in password ]
bcd_pass = [0x12, 0x34, 0x56]
password_nums = [int(x) for x in password]
self.assertEqual(BCD.encode_bcd(password),
bcd_pass)
self.assertEqual(BCD.decode_bcd(bcd_pass),
@ -37,24 +37,23 @@ class TestSequenceFunctions(unittest.TestCase):
# test instantiation:
b = BCD(password)
b._length = 3
#print b.values()
# print b.values()
self.assertEqual(b.value(), password)
# test dumping
b = BCD(1)
b._length = 3
b._id = 666 # this is actually impossible, but manually valid.
b._id = 666 # this is actually impossible, but manually valid.
d = b.dump()
self.assertEqual(d,
[666, 0, 0, 1])
def test_bmp(self):
"""
test if the classmethods in bmp work
"""
bignum = 4321056789
fcd_seq = [ 0xf4, 0xf3, 0xf2, 0xf1, 0xf0,
0xf5, 0xf6, 0xf7, 0xf8, 0xf9]
fcd_seq = [0xf4, 0xf3, 0xf2, 0xf1, 0xf0,
0xf5, 0xf6, 0xf7, 0xf8, 0xf9]
self.assertEqual(
BCD.encode_fcd(bignum),
fcd_seq)
@ -65,5 +64,6 @@ class TestSequenceFunctions(unittest.TestCase):
def test_llvar(self):
pass
if __name__ == '__main__':
unittest.main()

View file

@ -29,7 +29,7 @@ class TestParsingMechanisms(unittest.TestCase):
for packet in PACKETS:
rep = parse_represented_data(conv.toHexString(packet().to_list()))
self.assertEqual(rep.__class__,
packet)
packet)
def test_version_completion(self):
# following completion is sent by the PT with version on statusenquiry:
@ -38,7 +38,6 @@ class TestParsingMechanisms(unittest.TestCase):
rep = parse_represented_data(data_expected)
self.assertEqual(rep.__class__, Completion)
def test_parsing_two(self):
"""
parse some packets
@ -53,7 +52,7 @@ class TestParsingMechanisms(unittest.TestCase):
# 04 0F
'10 02 04 0F 37 27 00 04 00 00 00 00 40 00 49 09 78 0C 09 38 48 0D 04 25 22 F1 F1 59 66 66 66 66'\
'D2 00 21 22 01 00 17 00 01 87 01 75 0B 61 39 95 19 40 29 60 09 99 14 0E 05 12 8A 02 10 03 90 8C',
]
]
i = 0
for packet in PACKETS:
rep = parse_represented_data(packet)
@ -62,5 +61,6 @@ class TestParsingMechanisms(unittest.TestCase):
raise AssertionError("Packet could not be parsed: #%s" % i)
i += 1
if __name__ == '__main__':
unittest.main()

View file

@ -27,7 +27,7 @@ class Transmission(object):
self.transport = transport
self.is_master = True
self.is_waiting = False
self.last = None # saves last sent master
self.last = None # saves last sent master
self.log_list = []
self.history = []
self.last_history = []
@ -70,7 +70,8 @@ class Transmission(object):
whole sequence is finished.
"""
if not self.is_master or self.is_waiting:
raise TransmissionException("Can't send until transmisson is ready")
raise TransmissionException(
"Can't send until transmisson is ready")
self.is_master = False
self.last = packet
try:
@ -80,10 +81,12 @@ class Transmission(object):
# we sent the packet.
# now lets wait until we get master back.
while not self.is_master:
self.is_master = self.handle_packet_response(self.last, response)
self.is_master = self.handle_packet_response(
self.last, response)
if not self.is_master:
try:
success, response = self.transport.receive(self.actual_timeout)
success, response = self.transport.receive(
self.actual_timeout)
history += [(True, response)]
except common.TransportLayerException:
# some kind of timeout.
@ -95,7 +98,8 @@ class Transmission(object):
return TRANSMIT_TIMEOUT
if self.is_master and success:
# we actually have to handle a last packet
stay_master = self.handle_packet_response(packet, response)
stay_master = self.handle_packet_response(
packet, response)
print("Is Master Read Ahead happened.")
self.is_master = stay_master
except Exception as e:

View file

@ -2,8 +2,8 @@
TIMEOUT_T1 = 0.2
TIMEOUT_T2 = 15
TIMEOUT_T4 = 180
TIMEOUT_T4_DEFAULT = 180 # sec
TIMEOUT_T3 = 5 # sec
TIMEOUT_T4_DEFAULT = 180 # sec
TIMEOUT_T3 = 5 # sec
#: command separator
DLE = 0x10

View file

@ -19,6 +19,8 @@ from ecrterm.transmission.signals import *
from ecrterm.utils import ensure_bytes, is_stringlike
SERIAL_DEBUG = False
def std_serial_log(instance, data, incoming=False):
try:
if is_stringlike(incoming):
@ -30,9 +32,11 @@ def std_serial_log(instance, data, incoming=False):
except:
print("| error in log")
def noop(*args, **kwargs):
pass
class SerialMessage(object):
"""
Converts a Packet into a serial message by serializing the packet
@ -40,6 +44,7 @@ class SerialMessage(object):
CRC and double-DLEs included.
"""
apdu = None
def __init__(self, apdu=None):
if is_stringlike(apdu):
# try to get the list of bytes.
@ -49,8 +54,8 @@ class SerialMessage(object):
self.apdu = apdu
def _get_crc(self):
data = conv.hl2bs(self.apdu + [ ETX ])
#print "crc for %s => %s" % ([hex(i) for i in self.apdu], hex(crc.crc_xmodem16(data)))
data = conv.hl2bs(self.apdu + [ETX])
# print "crc for %s => %s" % ([hex(i) for i in self.apdu], hex(crc.crc_xmodem16(data)))
try:
return crc.crc_xmodem16(data)
except:
@ -59,20 +64,23 @@ class SerialMessage(object):
def _get_crc_l(self):
return self._get_crc() & 0x00FF
def _get_crc_h(self):
return (self._get_crc() & 0xFF00) >> 8
crc_l = property(_get_crc_l)
crc_h = property(_get_crc_h)
def crc(self):
return [ self.crc_l, self.crc_h ]
return [self.crc_l, self.crc_h]
def enrich(self, apdu):
# add 0x10 to each 0x10 in apdu
apdu = apdu[:] # since we use del later, it would occasionally hit the instance
# since we use del later, it would occasionally hit the instance
apdu = apdu[:]
new_apdu = []
while len(apdu):
if apdu.count(DLE):
new_apdu += apdu[:apdu.index(DLE) + 1] + [ DLE ]
new_apdu += apdu[:apdu.index(DLE) + 1] + [DLE]
del apdu[:apdu.index(DLE) + 1]
else:
new_apdu += apdu
@ -81,17 +89,19 @@ class SerialMessage(object):
def __repr__(self):
return "SerialMessage (APDU: %s, CRC-L: %s CRC-H: %s)" % (
conv.toHexString(self.apdu),
hex(self.crc_l),
hex(self.crc_h))
conv.toHexString(self.apdu),
hex(self.crc_l),
hex(self.crc_h))
def dump_message(self):
#if 0x10 in apdu:
# if 0x10 in apdu:
apdu = self.enrich(self.apdu)
return [DLE, STX] + apdu + [ DLE, ETX, self.crc_l, self.crc_h]
return [DLE, STX] + apdu + [DLE, ETX, self.crc_l, self.crc_h]
def as_bin(self):
return conv.hl2bs(self.dump_message())
class SerialTransport(common.Transport):
SerialCls = serial.Serial
slog = noop
@ -102,15 +112,15 @@ class SerialTransport(common.Transport):
def connect(self, timeout=30):
ser = self.SerialCls(
port=self.device,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_TWO,
bytesize=serial.EIGHTBITS,
timeout=timeout, # set a timeout value, None for waiting forever
xonxoff=0, # disable software flow control
rtscts=0, # disable RTS/CTS flow control
)
port=self.device,
baudrate=9600,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_TWO,
bytesize=serial.EIGHTBITS,
timeout=timeout, # set a timeout value, None for waiting forever
xonxoff=0, # disable software flow control
rtscts=0, # disable RTS/CTS flow control
)
if ser.isOpen() == False:
ser.open()
# 8< got that from somwhere, not sure what it does:
@ -138,7 +148,7 @@ class SerialTransport(common.Transport):
try:
self.slog(conv.bs2hl(something))
finally:
self.connection.write(ensure_bytes(something)) # !?
self.connection.write(ensure_bytes(something)) # !?
def write_ack(self):
# writes an ack.
@ -175,16 +185,18 @@ class SerialTransport(common.Transport):
# timeout to T1 after header.
self.connection.timeout = TIMEOUT_T1
while not crc:
b = ord(self.connection.read(1)) # read a byte.
b = ord(self.connection.read(1)) # read a byte.
if b is None:
# timeout
raise common.TransportLayerException("Timeout T1 reading stream.")
raise common.TransportLayerException(
"Timeout T1 reading stream.")
if b == ETX and dle:
# dle was set, and this is ETX, so we are at the end.
# we read the CRC now.
crc = self.connection.read(2)
if not crc:
raise common.TransportLayerException("Timeout T1 reading CRC")
raise common.TransportLayerException(
"Timeout T1 reading CRC")
else:
crc = conv.bs2hl(crc)
# and break
@ -200,7 +212,8 @@ class SerialTransport(common.Transport):
elif dle:
# dle was set, but we got no etx here.
# this seems to be an error.
raise common.TransportLayerException("DLE without sense detected.")
raise common.TransportLayerException(
"DLE without sense detected.")
# we add this byte to our apdu.
apdu += [b]
self.slog(header + apdu + [DLE, ETX] + crc, True)
@ -219,7 +232,7 @@ class SerialTransport(common.Transport):
self.write_ack()
return True, msg
else:
#self.write_nak()
# self.write_nak()
return False, msg
def receive(self, timeout=TIMEOUT_T2):
@ -258,16 +271,18 @@ class SerialTransport(common.Transport):
return True
elif acknowledge == ensure_bytes(chr(NAK)):
# not everything allright.
#if tries < 3:
# if tries < 3:
# return self.send_message(message, tries + 1, no_answer)
#else:
# else:
raise common.TransportLayerException("Could not send message")
elif not acknowledge:
# this happens quite a lot with the ingenico devices.
# possibly a workaround would be nice.
raise common.TransportTimeoutException("No Answer, Possible Timeout")
raise common.TransportTimeoutException(
"No Answer, Possible Timeout")
else:
raise common.TransportLayerException("Unknown Acknowledgment Byte %s" % conv.bs2hl(acknowledge))
raise common.TransportLayerException(
"Unknown Acknowledgment Byte %s" % conv.bs2hl(acknowledge))
def send(self, apdu, tries=0, no_wait=False):
"""
@ -275,6 +290,7 @@ class SerialTransport(common.Transport):
"""
return self.send_message(SerialMessage(apdu), tries, no_wait)
# self test
if __name__ == '__main__':
c = SerialTransport('/dev/ttyUSB0')

View file

@ -6,11 +6,13 @@ from ecrterm.transmission.transport_serial import *
class SerialTransportUnbuffered(SerialTransport):
class UnbufferedSerial(serial.Serial):
""" override Serial.read to use the *unbuffered* read function """
def read(self, size=1, timeout=None):
"""Read size bytes from the serial port. If a timeout is set it may
return less characters as requested. With no timeout it will block
until the requested number of bytes is read."""
if self.fd is None: raise serial.portNotOpenError
if self.fd is None:
raise serial.portNotOpenError
read = []
nread = 0
fd = self.fd
@ -20,13 +22,13 @@ class SerialTransportUnbuffered(SerialTransport):
timeout = self._timeout
if size > 0:
while nread < size:
#print "\tread(): size",size, "have", len(read) #debug
# print "\tread(): size",size, "have", len(read) #debug
ready, _, _ = select.select(fds, [], [], timeout)
if not ready:
break #timeout
break # timeout
buf = os.read(fd, size - nread)
if not buf:
break #early abort on timeout or error
break # early abort on timeout or error
read.append(buf)
nread += len(buf)
return ''.join(read)

View file

@ -53,7 +53,8 @@ class ZVTTransmission(Transmission):
whole sequence is finished.
"""
if not self.is_master or self.is_waiting:
raise TransmissionException("Can't send until transmisson is ready")
raise TransmissionException(
"Can't send until transmisson is ready")
self.is_master = False
try:
self.history += [(False, packet), ]
@ -65,7 +66,8 @@ class ZVTTransmission(Transmission):
self.is_master = self.handle_packet_response(packet, response)
if not self.is_master:
try:
success, response = self.transport.receive(self.actual_timeout)
success, response = self.transport.receive(
self.actual_timeout)
self.history += [(True, response)]
except common.TransportLayerException:
# some kind of timeout.
@ -77,7 +79,8 @@ class ZVTTransmission(Transmission):
return TRANSMIT_TIMEOUT
if self.is_master and success:
# we actually have to handle a last packet
stay_master = self.handle_packet_response(packet, response)
stay_master = self.handle_packet_response(
packet, response)
print("Is Master Read Ahead happened.")
self.is_master = stay_master
except Exception as e: