diff --git a/ecrterm/common.py b/ecrterm/common.py index 74b35fb..25b0939 100644 --- a/ecrterm/common.py +++ b/ecrterm/common.py @@ -276,6 +276,10 @@ DEBUG_PACKET_NAME = { } +def noop(*args, **kwargs): + pass + + class Logling(object): """A simple log interface.""" diff --git a/ecrterm/conv.py b/ecrterm/conv.py index ac28cd5..b8b4770 100644 --- a/ecrterm/conv.py +++ b/ecrterm/conv.py @@ -22,8 +22,10 @@ You should have received a copy of the GNU Lesser General Public License along with pyscard; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA """ +import re import sys from functools import reduce +from struct import unpack PACK = 1 HEX = 2 @@ -32,19 +34,20 @@ COMMA = 8 def padd(bytelist, length, padding='FF'): - """ Padds a byte list with a constant byte value (default is x0FF) - bytelist: the byte list to padd - length: the total length of the resulting byte list; - no padding if length is smaller than the byte list length - padding: padding value (default is 0xff) + """ + Padds a byte list with a constant byte value (default is x0FF) + bytelist: the byte list to padd + length: the total length of the resulting byte list; + no padding if length is smaller than the byte list length + padding: padding value (default is 0xff) - returns the padded bytelist - example: - padd(toBytes(\"3B 65 00 00 9C 11 01 01 03\"), 16) returns - [0x3B, 0x65, 0, 0, 0x9C, 0x11, 1, 1, 3, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF] - padd(toBytes(\"3B 65 00 00 9C 11 01 01 03\"), 8) returns - [0x3B, 0x65, 0, 0, 0x9C, 0x11, 1, 1, 3] + returns the padded bytelist + example: + padd(toBytes(\"3B 65 00 00 9C 11 01 01 03\"), 16) returns + [0x3B, 0x65, 0, 0, 0x9C, 0x11, 1, 1, 3, 0xFF, 0xFF, 0xFF, 0xFF, + 0xFF, 0xFF, 0xFF] + padd(toBytes(\"3B 65 00 00 9C 11 01 01 03\"), 8) returns + [0x3B, 0x65, 0, 0, 0x9C, 0x11, 1, 1, 3] """ if len(bytelist) < length: @@ -99,8 +102,8 @@ def toBytes(bytestring): bytestring: a byte string of the format "3B 65 00 00 9C 11 01 01 03" or "3B6500009C11010103" or "3B6500 009C1101 0103" """ - from struct import unpack - import re + if type(bytestring) in set([bytes, bytearray]): + return list(bytestring) packedstring = ''.join(re.split('\W+', bytestring)) if sys.version_info[0] > 2 and isinstance(packedstring, str): packedstring = packedstring.encode() diff --git a/ecrterm/ecr.py b/ecrterm/ecr.py index 37a717f..4527a79 100644 --- a/ecrterm/ecr.py +++ b/ecrterm/ecr.py @@ -21,7 +21,7 @@ from ecrterm.transmission._transmission import Transmission from ecrterm.transmission.signals import ACK, DLE, ETX, NAK, STX, TRANSMIT_OK from ecrterm.transmission.transport_serial import SerialTransport from ecrterm.transmission.transport_socket import SocketTransport -from ecrterm.utils import is_stringlike +from ecrterm.utils import detect_pt_serial, is_stringlike class A(object): @@ -416,9 +416,9 @@ class ECR(object): def detect_pt(self): # note: this only executes utils.detect_pt with the local ecrterm. - from ecrterm.utils import detect_pt - result = detect_pt(silent=False, ecr=self, timeout=2) - return result + if type(self.transport) is SerialTransport: + return detect_pt_serial(timeout=2, silent=False, ecr=self) + return True def parse_str(self, s): return parse_represented_data(s) diff --git a/ecrterm/packets/apdu.py b/ecrterm/packets/apdu.py index 352cc43..8042119 100644 --- a/ecrterm/packets/apdu.py +++ b/ecrterm/packets/apdu.py @@ -237,7 +237,7 @@ class APDUPacket(object): if is_stringlike(blob): # lets convert our string into a bytelist. blob = toBytes(blob) - if isinstance(blob, list): + if type(blob) is list: # allright. # first we detect our packetclass PacketClass = Packets.detect(blob[:2]) diff --git a/ecrterm/packets/base_packets.py b/ecrterm/packets/base_packets.py index e40573b..428ee85 100644 --- a/ecrterm/packets/base_packets.py +++ b/ecrterm/packets/base_packets.py @@ -25,12 +25,12 @@ class Packet(APDUPacket): introspection, bitmap_stati) else: introspection = '**%s' % bitmap_stati - return "%s{%s %s} %s" % ( + return '%s{%s %s} %s' % ( self.__class__.__name__, toHexString([self.cmd_class]), toHexString([self.cmd_instr]), introspection) def _handle_unknown_response(self, response, tm): - print("Unknown packet response %s" % response) + print('Unknown packet response %s' % response) tm.send_received() return False @@ -105,15 +105,15 @@ class Registration(Packet): # look thru all arguments: all needed fixed arguments here? if len(self.fixed_values) < 2: raise Exception( - "Registration Packet needs passwort and config_byte at least") + 'Registration Packet needs passwort and config_byte at least') elif len(self.fixed_values) < 3 and len(self.bitmaps) > 0: - raise Exception("Registration Packet needs CC if you add a bitmap") + raise Exception('Registration Packet needs CC if you add a bitmap') # look thru all bitmaps: all bitmaps allowed? return True def consume_fixed(self, data, length): if length < 4: - raise Exception("Registration needs at least 4 bytes.") + raise Exception('Registration needs at least 4 bytes.') if length >= 4: # only password and byte # no cc @@ -155,8 +155,8 @@ class Registration(Packet): ret |= 0x8 else: print( - "Note: intermediate status not requested, but mandatory in " - "CardComplete Terminals") + 'Note: intermediate status not requested, but mandatory in ' + 'CardComplete Terminals') if ecr_controls_payment: ret |= 0x10 # 0010 0000 @@ -236,7 +236,7 @@ class Initialisation(Packet): Network-Initialization. """ cmd_instr = 0x93 - fixed_arguments = ['password', ] + fixed_arguments = ['password'] fixed_values = {'password': '123456'} wait_for_completion = True @@ -471,7 +471,7 @@ class IntermediateStatusInformation(Packet): return data def __repr__(self): - return "IntermediateStatus{04 FF}: %s" % ( + return 'IntermediateStatus{04 FF}: %s' % ( INTERMEDIATE_STATUS_CODES.get( self.fixed_values.get('intermediate_status', None), 'No status')) @@ -511,7 +511,7 @@ class PacketReceivedError(Packet): self.cmd_instr = error_code def __repr__(self): - return "PacketReceivedERROR{84 %s}: %s" % ( + return 'PacketReceivedERROR{84 %s}: %s' % ( toHexString([self.error_code]), ERRORCODES.get(self.error_code, 'Unknown Error'), ) diff --git a/ecrterm/transmission/_transmission.py b/ecrterm/transmission/_transmission.py index 9ea4792..4e4104a 100644 --- a/ecrterm/transmission/_transmission.py +++ b/ecrterm/transmission/_transmission.py @@ -5,7 +5,7 @@ Transmission Basics. from ecrterm.exceptions import TransmissionException, TransportLayerException from ecrterm.packets.base_packets import PacketReceived from ecrterm.transmission.signals import ( - TIMEOUT_T4_DEFAULT, TRANSMIT_ERROR, TRANSMIT_OK, TRANSMIT_TIMEOUT) + TIMEOUT_T4_DEFAULT, TRANSMIT_OK, TRANSMIT_TIMEOUT) class Transmission(object): @@ -65,9 +65,9 @@ class Transmission(object): self.is_master = False self.last = packet try: - history += [(False, packet), ] + history += [(False, packet)] success, response = self.transport.send(packet) - history += [(True, response), ] + history += [(True, response)] # we sent the packet. # now lets wait until we get master back. while not self.is_master: @@ -95,6 +95,5 @@ class Transmission(object): except Exception as e: self.is_master = True raise - return TRANSMIT_ERROR self.is_master = True return TRANSMIT_OK diff --git a/ecrterm/transmission/transport_serial.py b/ecrterm/transmission/transport_serial.py index d2ea0d9..4e19bd3 100644 --- a/ecrterm/transmission/transport_serial.py +++ b/ecrterm/transmission/transport_serial.py @@ -8,7 +8,7 @@ The Serial Layer is a transport used for import serial -from ecrterm.common import Transport +from ecrterm.common import Transport, noop from ecrterm.conv import bs2hl, hl2bs, toBytes, toHexString from ecrterm.crc import crc_xmodem16 from ecrterm.exceptions import ( @@ -33,15 +33,11 @@ def std_serial_log(instance, data, incoming=False): print('| error in log') -def noop(*args, **kwargs): - pass - - class SerialMessage(object): """ - Converts a Packet into a serial message by serializing the packet - and inserting it into the final Serial Packet - CRC and double-DLEs included. + Converts a Packet into a serial message by serializing the packet + and inserting it into the final Serial Packet + CRC and double-DLEs included. """ apdu = None diff --git a/ecrterm/transmission/transport_socket.py b/ecrterm/transmission/transport_socket.py index 3911fdd..411815b 100644 --- a/ecrterm/transmission/transport_socket.py +++ b/ecrterm/transmission/transport_socket.py @@ -1,19 +1,114 @@ -from socket import create_connection +from binascii import hexlify +from socket import SHUT_RDWR, create_connection +from socket import timeout as exc_timeout +from struct import unpack +from typing import Tuple -from ecrterm.common import Transport +from ecrterm.common import Transport, noop +from ecrterm.conv import bs2hl +from ecrterm.packets.apdu import APDUPacket + + +def hexformat(data: bytes) -> str: + """Return a prettified binary data.""" + hexlified = str(hexlify(data), 'ascii') + splitted = ':'.join( + hexlified[i:i + 2] for i in range(0, len(hexlified), 2)) + return repr(bytes(data)) + ' -> ' + splitted class SocketTransport(Transport): """Transport for TCP/IP.""" insert_delays = False + slog = noop - def __init__(self, uri: str): + def __init__(self, uri: str, debug: bool=False): """Setup the IP and Port.""" prefix, ip, port = uri.split(':') self.port = int(port) self.ip = ip[2:] # Trim '//' from the beginning + self._debug = debug - def connect(self, timeout: int=30): - """Connect to the TCP socket.""" - self.sock = create_connection( - address=(self.ip, self.port), timeout=timeout) + def connect(self, timeout: int=30) -> bool: + """ + Connect to the TCP socket. Return `True` on successful + connection, `False` on an unsuccessful one. + """ + try: + self.sock = create_connection( + address=(self.ip, self.port), timeout=timeout) + return True + except (ConnectionError, exc_timeout) as exc: + return False + + def send(self, apdu, tries: int=0, no_wait: bool=False): + """Send data.""" + to_send = bytes(apdu.to_list()) + self.slog(data=bs2hl(binstring=to_send), incoming=False) + total_sent = 0 + msglen = len(to_send) + while total_sent < msglen: + sent = self.sock.send(to_send[total_sent:]) + if self._debug: + print('sent', sent, 'bytes of', hexformat( + data=to_send[total_sent:])) + if sent == 0: + raise RuntimeError('Socket connection broken.') + total_sent += sent + if no_wait: + return True + return self.receive() + + def _receive_bytes(self, length: int) -> bytes: + """Receive and return a fixed amount of bytes.""" + recv_bytes = 0 + result = b'' + if self._debug: + print('waiting for', length, 'bytes') + while recv_bytes < length: + chunk = self.sock.recv(length - recv_bytes) + if self._debug: + print('received', len(chunk), 'bytes:', hexformat(data=chunk)) + if chunk == b'': + raise RuntimeError('Socket connection broken.') + result += chunk + recv_bytes += len(chunk) + return result + + def _receive_length(self) -> Tuple[bytes, int]: + """ + Receive the 4 bytes on the socket which indicates the message + length, and return the packed and the `int` converted length. + """ + data = self._receive_bytes(length=3) + length = data[2] + if length != 0xff: + return data, length + # Need to get 2 more bytes + length = self._receive_bytes(length=2) + data += length + return data, unpack(' bytes: + """ + Receive the response from the terminal and return is as `bytes`. + """ + data, length = self._receive_length() + if not length: # Length is 0 + return data + new_data = self._receive_bytes(length=length) + return data + new_data + + def receive( + self, timeout=None, *args, **kwargs) -> Tuple[bool, APDUPacket]: + """ + Receive data, return success status and ADPUPacket instance. + """ + data = self._receive() + self.slog(data=bs2hl(binstring=data), incoming=True) + return True, APDUPacket.parse(blob=data) + + def close(self): + """Shutdown and close the connection.""" + self.sock.shutdown(SHUT_RDWR) + self.sock.close() diff --git a/ecrterm/utils.py b/ecrterm/utils.py index 7fea824..eabfafe 100644 --- a/ecrterm/utils.py +++ b/ecrterm/utils.py @@ -18,7 +18,7 @@ def ensure_bytes(v): return v -def detect_pt(device='/dev/ttyUSB0', timeout=2, silent=True, ecr=None): +def detect_pt_serial(device='/dev/ttyUSB0', timeout=2, silent=True, ecr=None): """ connects to given serial port and tests if a PT is present. if present: tries to return version number or True @@ -35,7 +35,7 @@ def detect_pt(device='/dev/ttyUSB0', timeout=2, silent=True, ecr=None): if StatusEnquiry is None: from ecrterm.packets.base_packets import StatusEnquiry, Completion - def __detect_pt(port, timeout, ecr): + def __detect_pt_serial(port, timeout, ecr): e = ecr or ECR(port) # reconnect to have lower timeout e.transport.close() @@ -54,15 +54,15 @@ def detect_pt(device='/dev/ttyUSB0', timeout=2, silent=True, ecr=None): e.transport.connect() if silent: try: - return __detect_pt(device, timeout, ecr) + return __detect_pt_serial(device, timeout, ecr) except Exception: return False else: - return __detect_pt(device, timeout, ecr) + return __detect_pt_serial(device, timeout, ecr) if __name__ == '__main__': - if detect_pt(): + if detect_pt_serial(): print("PT is online at ttyUSB0") else: print("PT cant be found at ttyUSB0") diff --git a/test_pt.py b/test_pt.py index 098bbf8..567e00c 100755 --- a/test_pt.py +++ b/test_pt.py @@ -1,46 +1,48 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- """ - Example script to demonstrate a payment process. +Example script to demonstrate a payment process. """ -import time - -from ecrterm import ecr, packets, transmission +from ecrterm.ecr import ECR, ecr_log +from ecrterm.packets.base_packets import Registration if __name__ == '__main__': def printer(lines_of_text): for line in lines_of_text: print(line) - e = ecr.ECR(device='/dev/ttyUSB0') + e = ECR(device='socket://192.168.1.163:20007') # reenable logging: - e.transport.slog = ecr.ecr_log + e.transport.slog = ecr_log print(e.detect_pt()) if e.detect_pt(): - e.register(config_byte=packets.base_packets.Registration.generate_config( - ecr_prints_receipt=False, - ecr_prints_admin_receipt=False, - ecr_controls_admin=True, - ecr_controls_payment=True,)) + e.register(config_byte=Registration.generate_config( + ecr_prints_receipt=False, + ecr_prints_admin_receipt=False, + ecr_controls_admin=True, + ecr_controls_payment=True)) status = e.status() if status: - print("Status code of PT is %s" % status) + print('Status code of PT is %s' % status) # laut doku sollte 0x9c bedeuten, ein tagesabschluss erfolgt # bis jetzt unklar ob er es von selbst ausführt. if status == 0x9c: - print("End Of Day") + print('End Of Day') e.end_of_day() # last_printout() would work too: printer(e.daylog) else: - print("Unknown Status Code: %s" % status) + print('Unknown Status Code: %s' % status) # status == 0xDC for ReadCard (06 C0) -> Karte drin. 0x9c karte draussen. if e.payment(50): printer(e.last_printout()) e.wait_for_status() - e.show_text(lines=['Auf Wiedersehen!', ' ', 'Zahlung erfolgt'], beeps=0) + e.show_text( + lines=['Auf Wiedersehen!', ' ', 'Zahlung erfolgt'], beeps=0) else: e.wait_for_status() - e.show_text(lines=['Auf Wiedersehen!', ' ', 'Vorgang abgebrochen'], beeps=1) + e.show_text( + lines=['Auf Wiedersehen!', ' ', 'Vorgang abgebrochen'], + beeps=1)