# Komunikasi HW-VX6330K Menggunakan Python
HW-VX6330K adalah salah satu UHF reader middle range yang dapat melakukan read maupun write terhadap tag. Komunikasi dengan reader dapat menggunakan bahasa pemrograman apapun asalkan komunikasi sudah sesuai dengan protokol yang ada.
Untuk detail produk dapat klik link berikut ini Electron HW-VX6330K (opens new window).
Pada artikel ini kami mencoba komunikasikan reader menggunakan bahasa pemrograman Python menggunakan protokol (tidak menggunakan SDK), koneksi via Serial atau TCP/IP.
# Persiapan
Sebelum masuk ke bagian kode, perlu disiapkan beberapa hal berikut:
- Dokumentasi protokol (opens new window) sebagai pedoman
- Konverter RS232 to USB Serial / Kabel Ethernet
- Python versi 2.7 / versi 3 ke atas
- Library pyserial (opens new window), digunakan untuk komunikasi Serial (RS232)
pip install pyserial
Konverter dapat dibeli di link berikut USB to RS232 CH340 Serial (opens new window).
# Hubungkan Reader ke Komputer
# [Serial] Konverter RS232 to Serial
Proses menghubungkan HW-VX6330K dengan komputer cukup sambungkan dengan menggunakan kabel konverter RS232 to USB Serial.
Setelah terhubung pastikan sudah terdeteksi adanya serial port di Device Manager.
# [TCP/IP] Kabel Ethernet
Untuk menghubungkan HW-VX6330K dengan komputer menggunakan kabel ethernet, kita perlu samakan dulu IP Network komputer terhadap reader.
Default IP Address reader adalah 192.168.1.192/24
dengan port 6000
.
Bagi pengguna Windows, bisa di atur di Control Panel > Network and Sharing Center > Pilih adapter lalu sesuaikan IP Network-nya.
Contoh: IP reader 192.168.1.192/24
, IP komputer: 192.168.1.1/24
.
# Kode dan Penjelasan
Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol (opens new window).
Kode dibawah ini dipisah menjadi 4 file:
transport.py
: mengatur penerimaan dan pengiriman byte dari/ke reader.command.py
: proses parsing pengiriman byte ke reader.response.py
: proses parsing penerimaan byte dari reader.reader.py
: menggabungkan 3 class diatas.main.py
: file yang akan dijalankan Python.
# 1. Transport (transport.py
)
Class ini yang mengatur pengiriman dan penerimaan byte dari/ke reader. Pada contoh, kita akan coba buat 1 base class (dasar), 2 derived class (turunan) yakni untuk Serial dan TCP/IP.
from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar('T', bound='Parent')
class Transport(ABC):
@abstractmethod
def read_bytes(self, length: int) -> bytes:
raise NotImplementedError
@abstractmethod
def write_bytes(self, buffer: bytes) -> None:
raise NotImplementedError
def read_frame(self) -> bytes:
length_bytes = self.read_bytes(1)
frame_length = ord(chr(length_bytes[0]))
data = length_bytes + self.read_bytes(frame_length)
return bytearray(data)
@abstractmethod
def close(self) -> None:
raise NotImplementedError
class TcpTransport(Transport):
def __init__(self, ip_address: str, port: int, timeout: int = 1) -> None:
self.socket = socket(AF_INET, SOCK_STREAM)
self.socket.settimeout(timeout)
self.socket.connect((ip_address, port))
def read_bytes(self, length: int) -> bytes:
return self.socket.recv(length)
def write_bytes(self, buffer: bytes) -> None:
self.socket.sendall(buffer)
def close(self) -> None:
self.socket.close()
class SerialTransport(Transport):
def __init__(self, serial_port: str, baud_rate: int, timeout: int = 1) -> None:
self.serial = serial.Serial(serial_port, baud_rate,
timeout=timeout, write_timeout=timeout)
def read_bytes(self, length: int) -> bytes:
return self.serial.read(length)
def write_bytes(self, buffer: bytes) -> None:
self.serial.write(buffer)
def close(self) -> None:
self.serial.close()
# 2. Command (command.py
)
Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol (opens new window), pada bagian 4.1 EPC C1 G2(ISO18000-6C)COMMAND.
Pada class ini terdapat kode untuk menghitung algoritma checksum, algoritma checksum yang digunakan pada reader HW-VX6330K adalah CRC-16/MCRF4XX.
CMD_INVENTORY = 0x01
CMD_READ_MEMORY = 0x02
CMD_WRITE_MEMORY = 0x03
CMD_READER_INFORMATION = 0x21
CMD_SET_READER_POWER = 0x2F
class Command:
def __init__(self, command: int, reader_address: int = 0xFF,
data: bytes | int | None = None):
self.command = command
self.reader_address = reader_address
self.data = data
if isinstance(data, int):
self.data = bytearray([data])
if data is None:
self.data = bytearray()
self.frame_length = 4 + len(self.data)
self.base_data = bytearray([self.frame_length, self.reader_address, self.command])
self.base_data.extend(self.data)
def serialize(self) -> bytes:
serialize = self.base_data
# Checksum CRC-16/MCRF4XX
value = 0xFFFF
for d in serialize:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
serialize = serialize + bytes([crc_lsb])
serialize = serialize + bytes([crc_msb])
return serialize
# 3. Response (response.py
)
class Response:
def __init__(self, response_bytes: bytes):
self.response_bytes = response_bytes
self.length = response_bytes[0]
self.reader_address = response_bytes[1]
self.command = response_bytes[2]
self.status = response_bytes[3] # Check 5. LIST OF COMMAND EXECUTION RESULT STATUS
self.data = response_bytes[4:-2]
self.checksum = response_bytes[-2:]
def __str__(self) -> str:
return_value = ''
value = '>>> START RESPONSE ================================'
return_value = f'{return_value}\n{value}'
value = f'RESPONSE >> {hex_readable(self.response_bytes)}' # Response
return_value = f'{return_value}\n{value}'
value = f'READER ADDRESS >> {hex_readable(self.reader_address)}' # Reader Address
return_value = f'{return_value}\n{value}'
value = f'COMMAND >> {hex_readable(self.command)}' # Command
return_value = f'{return_value}\n{value}'
value = f'STATUS >> {hex_readable(self.status)}' # Status
return_value = f'{return_value}\n{value}'
if self.data:
value = f'DATA >> {hex_readable(self.data)}' # Data
return_value = f'{return_value}\n{value}'
value = f'CHECKSUM >> {hex_readable(self.checksum)}' # Checksum
return_value = f'{return_value}\n{value}'
value = '>>> END RESPONSE ================================'
return_value = f'{return_value}\n{value}'
return return_value.strip()
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
if isinstance(data, int):
return "{:02X}".format(data)
return bytes_separator.join("{:02X}".format(x) for x in data)
# 4. Reader (reader.py
)
Pada class Reader berikut yang kami demokan adalah proses:
- Inventory (Answer Mode)
Jangan lupa di set work mode:
Answer Mode
pada program demo - Read memory (read memory bank TID)
- Write memory (write memory bank EPC)
- Set power (jarak baca reader terhadap tag)
Perbedaan antar memory bank dapat dibaca di artikel berikut Memory Bank EPC UHF Gen2 Air Interface Protocol (opens new window).
from typing import Iterator
from transport import Transport
from command import *
from response import *
class Reader:
def __init__(self, transport: Transport) -> None:
self.transport = transport
def close(self) -> None:
self.transport.close()
def __send_request(self, command: Command) -> None:
self.transport.write_bytes(command.serialize())
def __get_response(self) -> bytes:
return self.transport.read_frame()
def inventory(self) -> Iterator[bytes]: # 8.2.1 Inventory (Answer Mode)
command = Command(CMD_INVENTORY)
self.__send_request(command)
response = Response(self.__get_response())
tag_count = response.data[0]
data = response.data
n = 0
pointer = 1
while n < tag_count:
tag_len = int(data[pointer])
tag_data_start = pointer + 1
tag_main_start = tag_data_start
tag_main_end = tag_main_start + tag_len
next_tag_start = tag_main_end
tag = data[tag_data_start:tag_main_start] \
+ data[tag_main_start:tag_main_end] + data[tag_main_end:next_tag_start]
yield tag
pointer = next_tag_start
n += 1
def read_memory(self, epc: bytes, memory_bank: int, start_address: int, length: int,
access_password: bytes = bytes(4)) -> Response: # 8.2.2 Read Data
request_data = bytearray()
request_data.extend(bytearray([int(len(epc) / 2)])) # EPC Length in word
request_data.extend(epc)
request_data.extend(bytearray([memory_bank, start_address, length]))
request_data.extend(access_password)
command = Command(CMD_READ_MEMORY, data=request_data)
self.__send_request(command)
return Response(self.__get_response())
def write_memory(self, epc: bytes, memory_bank: int, start_address: int,
data_to_write: bytes,
access_password: bytes = bytes(4)) -> Response: # 8.2.4 Write Data
request_data = bytearray()
request_data.extend(bytearray([int(len(data_to_write) / 2)])) # Data length in word
request_data.extend(bytearray([int(len(epc) / 2)])) # EPC Length in word
request_data.extend(epc)
request_data.extend(bytearray([memory_bank, start_address]))
request_data.extend(data_to_write)
request_data.extend(access_password)
command = Command(CMD_WRITE_MEMORY, data=request_data)
self.__send_request(command)
return Response(self.__get_response())
def set_power(self, power: int) -> Response: # 8.4.6 Set Power
assert 0 <= power <= 30
command = Command(CMD_SET_READER_POWER, data=bytearray([power]))
self.__send_request(command)
return Response(self.__get_response())
Pada fungsi inventory()
response blok data
berisi beberapa EPC sekaligus, byte pertama (response.data[0]
) yang menentukan berapa jumlah tag yang terdapat disekitar reader.
# 5. main.py
File ini yang akan dijalankan pertama kali python main.py
, kita akan memanggil class-class yang sudah dibuat.
Jika koneksi via TCP/IP, Anda dapat menggunakan class TcpTransport()
sebagai parameter ke Reader
.
from response import hex_readable
from transport import SerialTransport, TcpTransport
from reader import Reader
transport = SerialTransport('COM6', 57600)
# transport = TcpTransport('192.168.1.192', 6000)
reader = Reader(transport)
#########################################################
# 1. Inventory - Answer Mode
tags = reader.inventory()
for epc in tags:
print(f'EPC: {hex_readable(epc)}')
#########################################################
# 2. Read specific memory, read memory bank TID (0x02), please check 8.2.2 Read Data
# EPC: 12 34 56 78 90 12 34 56 AD EF 01 23
epc = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
response_read = reader.read_memory(epc=epc, memory_bank=0x02, start_address=0x00, length=0x07)
print(response_read)
#########################################################
# 3. Write specific memory, write memory bank EPC (0x01), please check 8.2.3 Write Data
epc = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
data_to_write = bytearray([0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00, 0x99, 0x88])
response_write = reader.write_memory(epc=epc, memory_bank=0x01, start_address=0x02, data_to_write=data_to_write)
print(response_write)
#########################################################
# 4. Set power, please check 8.4.6 Set Power
power = 28
response_set_power = reader.set_power(power)
print(response_set_power)
#########################################################
reader.close()
# Informasi lainnya
- Untuk kode lebih lanjut silahkan dibuat fungsi tambahan pada class
Reader
yang mengacu ke dokumentasi protokol (opens new window). - CRC calculator (opens new window).