HW-VX6330K adalah salah satu UHF reader middle range yang dapat melakukan read maupun write terhadap tag. Komunikasi dengan reader dapat menggunakan bahasa pemrograman apapun asalkan komunikasi sudah sesuai dengan protokol yang ada.
Pada artikel ini kami mencoba komunikasikan reader menggunakan bahasa pemrograman Python menggunakan protokol (tidak menggunakan SDK), koneksi via Serial atau TCP/IP .
Persiapan §
Sebelum masuk ke bagian kode, perlu disiapkan beberapa hal berikut:
Dokumentasi protokol sebagai pedoman
Konverter RS232 to USB Serial / Kabel Ethernet
Python versi 3.11 ke atas
Library pyserial , digunakan untuk komunikasi Serial (RS232)
Konverter dapat dibeli di link berikut USB to RS232 CH340 Serial .
Hubungkan Reader ke Komputer §
RS232 §
Proses menghubungkan HW-VX6330K dengan komputer cukup sambungkan dengan menggunakan kabel konverter RS232 to USB Serial.
Setelah terhubung pastikan sudah terdeteksi adanya serial port di Device Manager.
TCP/IP §
Untuk menghubungkan HW-VX6330K dengan komputer menggunakan kabel ethernet, kita perlu samakan dulu IP Network komputer terhadap reader.
Default IP Address reader adalah 192.168.1.192/24
dengan port 6000
.
Bagi pengguna Windows, bisa di atur di Control Panel > Network and Sharing Center > Pilih adapter lalu sesuaikan IP Network-nya.
Contoh: IP reader 192.168.1.192/24
, IP komputer: 192.168.1.1/24
.
Kode dan Penjelasan §
Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol .
Kode dibawah ini dipisah menjadi 4 file:
transport.py
: mengatur penerimaan dan pengiriman byte dari/ke reader.
command.py
: proses parsing pengiriman byte ke reader.
response.py
: proses parsing penerimaan byte dari reader.
reader.py
: menggabungkan 3 class diatas.
main.py
: file yang akan dijalankan Python.
1. Transport (transport.py
) §
Class ini yang mengatur pengiriman dan penerimaan byte dari/ke reader.
Pada contoh, kita akan coba buat 1 base class (dasar), 2 derived class (turunan) yakni untuk Serial dan TCP/IP.
from abc import ABC , abstractmethod
from socket import socket, AF_INET , SOCK_STREAM
from typing import TypeVar
import serial
T = TypeVar( 'T' , bound = 'Parent' )
class Transport ( ABC ):
@abstractmethod
def read_bytes (self, length: int ) -> bytes :
raise NotImplementedError
@abstractmethod
def write_bytes (self, buffer: bytes ) -> None :
raise NotImplementedError
def read_frame (self) -> bytes | None :
length_bytes = self .read_bytes( 1 )
if not length_bytes:
return
frame_length = ord ( chr (length_bytes[ 0 ]))
data = length_bytes + self .read_bytes(frame_length)
return bytearray (data)
@abstractmethod
def close (self) -> None :
raise NotImplementedError
class TcpTransport ( Transport ):
def __init__ (self, ip_address: str , port: int , timeout: int = 1 ) -> None :
self .socket = socket( AF_INET , SOCK_STREAM )
self .socket.settimeout(timeout)
self .socket.connect((ip_address, port))
def read_bytes (self, length: int ) -> bytes :
return self .socket.recv(length)
def write_bytes (self, buffer: bytes ) -> None :
self .socket.sendall(buffer)
def close (self) -> None :
self .socket.close()
class SerialTransport ( Transport ):
def __init__ (self, serial_port: str , baud_rate: int , timeout: int = 1 ) -> None :
self .serial = serial.Serial(serial_port, baud_rate,
timeout = timeout, write_timeout = timeout)
def read_bytes (self, length: int ) -> bytes :
return self .serial.read(length)
def write_bytes (self, buffer: bytes ) -> None :
self .serial.write(buffer)
def close (self) -> None :
self .serial.close()
2. Command (command.py
) §
Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol , pada bagian 4.1 EPC C1 G2(ISO18000-6C)COMMAND .
Pada class ini terdapat kode untuk menghitung algoritma checksum, algoritma checksum yang digunakan pada reader HW-VX6330K adalah CRC-16/MCRF4XX .
CMD_INVENTORY = 0x 01
CMD_READ_MEMORY = 0x 02
CMD_WRITE_MEMORY = 0x 03
CMD_READER_INFORMATION = 0x 21
CMD_SET_READER_POWER = 0x 2F
class Command :
def __init__ (self, command: int , reader_address: int = 0x FF ,
data: bytes | int | None = None ):
self .command = command
self .reader_address = reader_address
self .data = data
if isinstance (data, int ):
self .data = bytearray ([data])
if data is None :
self .data = bytearray ()
self .frame_length = 4 + len ( self .data)
self .base_data = bytearray ([ self .frame_length, self .reader_address, self .command])
self .base_data.extend( self .data)
def serialize (self) -> bytes :
serialize = self .base_data
# Checksum CRC-16/MCRF4XX
value = 0x FFFF
for d in serialize:
value ^= d
for _ in range ( 8 ):
value = (value >> 1 ) ^ 0x 8408 if value & 0x 0001 else (value >> 1 )
crc_msb = value >> 0x 08
crc_lsb = value & 0x FF
serialize = serialize + bytes ([crc_lsb])
serialize = serialize + bytes ([crc_msb])
return serialize
3. Response (response.py
) §
class Response :
def __init__ (self, response_bytes: bytes ):
self .response_bytes = response_bytes
self .length = response_bytes[ 0 ]
self .reader_address = response_bytes[ 1 ]
self .command = response_bytes[ 2 ]
self .status = response_bytes[ 3 ] # Check 5. LIST OF COMMAND EXECUTION RESULT STATUS
self .data = response_bytes[ 4 : - 2 ]
self .checksum = response_bytes[ - 2 :]
def __str__ (self) -> str :
return_value = ''
value = '>>> START RESPONSE ================================'
return_value = f ' { return_value }\n{ value } '
value = f 'RESPONSE >> { hex_readable( self .response_bytes) } ' # Response
return_value = f ' { return_value }\n{ value } '
value = f 'READER ADDRESS >> { hex_readable( self .reader_address) } ' # Reader Address
return_value = f ' { return_value }\n{ value } '
value = f 'COMMAND >> { hex_readable( self .command) } ' # Command
return_value = f ' { return_value }\n{ value } '
value = f 'STATUS >> { hex_readable( self .status) } ' # Status
return_value = f ' { return_value }\n{ value } '
if self .data:
value = f 'DATA >> { hex_readable( self .data) } ' # Data
return_value = f ' { return_value }\n{ value } '
value = f 'CHECKSUM >> { hex_readable( self .checksum) } ' # Checksum
return_value = f ' { return_value }\n{ value } '
value = '>>> END RESPONSE ================================'
return_value = f ' { return_value }\n{ value } '
return return_value.strip()
def hex_readable (data: bytes | int , bytes_separator: str = " " ) -> str :
if isinstance (data, int ):
return " { :02X } " .format(data)
return bytes_separator.join( " { :02X } " .format(x) for x in data)
4. Reader (reader.py
) §
from typing import Iterator
from transport import Transport
from command import *
from response import *
class Reader :
def __init__ (self, transport: Transport) -> None :
self .transport = transport
def close (self) -> None :
self .transport.close()
def __send_request (self, command: Command) -> None :
self .transport.write_bytes(command.serialize())
def __get_response (self) -> bytes :
return self .transport.read_frame()
def inventory_answer_mode (self) -> Iterator[ bytes ]: # 8.2.1 Inventory (Answer Mode)
command: Command = Command( CMD_INVENTORY )
self .__send_request(command)
response: Response = Response( self .__get_response())
tag_count: int = response.data[ 0 ]
data: bytes = response.data
n: int = 0
pointer: int = 1
while n < tag_count:
tag_len = int (data[pointer])
tag_data_start = pointer + 1
tag_main_start = tag_data_start
tag_main_end = tag_main_start + tag_len
next_tag_start = tag_main_end
tag = data[tag_data_start:tag_main_start] \
+ data[tag_main_start:tag_main_end] + data[tag_main_end:next_tag_start]
yield tag
pointer = next_tag_start
n += 1
def inventory_active_mode (self) -> Iterator[Response]:
while True :
try :
raw_response: bytes | None = self .__get_response()
except TimeoutError :
continue
if raw_response is None :
continue
response: Response = Response(raw_response)
yield response
def read_memory (self, epc: bytes , memory_bank: int , start_address: int , length: int ,
access_password: bytes = bytes ( 4 )) -> Response: # 8.2.2 Read Data
request_data = bytearray ()
request_data.extend( bytearray ([ int ( len (epc) / 2 )])) # EPC Length in word
request_data.extend(epc)
request_data.extend( bytearray ([memory_bank, start_address, length]))
request_data.extend(access_password)
command: Command = Command( CMD_READ_MEMORY , data = request_data)
self .__send_request(command)
return Response( self .__get_response())
def write_memory (self, epc: bytes , memory_bank: int , start_address: int ,
data_to_write: bytes ,
access_password: bytes = bytes ( 4 )) -> Response: # 8.2.4 Write Data
request_data: bytearray = bytearray ()
request_data.extend( bytearray ([ int ( len (data_to_write) / 2 )])) # Data length in word
request_data.extend( bytearray ([ int ( len (epc) / 2 )])) # EPC Length in word
request_data.extend(epc)
request_data.extend( bytearray ([memory_bank, start_address]))
request_data.extend(data_to_write)
request_data.extend(access_password)
command: Command = Command( CMD_WRITE_MEMORY , data = request_data)
self .__send_request(command)
return Response( self .__get_response())
def set_power (self, power: int ) -> Response: # 8.4.6 Set Power
assert 0 <= power <= 30
command: Command = Command( CMD_SET_READER_POWER , data = bytearray ([power]))
self .__send_request(command)
return Response( self .__get_response())
Pada class Reader terdapat fungsi:
inventory_answer_mode()
: Inventory (answer mode)
inventory_active_mode()
: Inventory (active mode)
read_memory(...)
: Mengambil data pada memory bank tertentu
write_memory(...)
: Menulis data pada memory bank tertentu
set_power(...)
: Mengatur jarak baca reader terhadap tag (semakin besar semakin jauh)
Saat eksekusi fungsi read_memory(...)
, write_memory(...)
, dan set_power(...)
disarankan atur inventory ke Answer Mode terlebih dahulu (lihat point no 5 ) agar tidak terjadi tabrakan data (data response dari reader terhalang dengan response active mode)
5. Setting reader (atur mode inventory) §
Mode inventory dapat diatur pada program demo:
Pada contoh di atas reader berjalan di mode Active Mode (Multi-Tag) ,
kami rekomendasi menggunakan Multi-Tag tanpa Single Tag Filtering Time agar output tag yang keluar lebih cepat.
6. main.py
§
File ini yang akan dijalankan pertama kali python main.py
, kita akan memanggil class-class yang sudah dibuat.
Jika koneksi via TCP/IP, Anda dapat menggunakan class TcpTransport()
sebagai parameter ke Reader
.
Uncomment fungsi/kode yang ingin digunakan.
from typing import Iterator
from response import hex_readable, Response
from transport import SerialTransport, TcpTransport
from reader import Reader
transport = SerialTransport( 'COM6' , 57600 )
# transport = TcpTransport('192.168.1.192', 6000)
reader = Reader(transport)
#########################################################
# 1. Inventory - Answer Mode
# tags: Iterator[bytes] = reader.inventory_answer_mode()
# for epc in tags:
# print(f'Tag: {hex_readable(tag)}')
#########################################################
# 2. Inventory - Active Mode
try :
responses: Iterator[Response] = reader.inventory_active_mode()
for response in responses:
# print(response)
tag: bytes = response.data
print ( f 'Tag: { hex_readable(tag) } ' )
except KeyboardInterrupt : # Handle Ctrl + C
pass
#########################################################
# 3. Read specific memory, read memory bank TID (0x02), please check 8.2.2 Read Data
# EPC: 12 34 56 78 90 12 34 56 AD EF 01 23
# epc: bytearray = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
# response_read: Response = reader.read_memory(epc=epc, memory_bank=0x02, start_address=0x00, length=0x07)
# print(response_read)
#########################################################
# 4. Write specific memory, write memory bank EPC (0x01), please check 8.2.3 Write Data
# epc: bytearray = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
# data_to_write: bytearray = bytearray([0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00, 0x99, 0x88])
# response_write: Response = reader.write_memory(epc=epc, memory_bank=0x01, start_address=0x02, data_to_write=data_to_write)
# print(response_write)
#########################################################
# 5. Set power, please check 8.4.6 Set Power
# power: int = 28
# response_set_power: Response = reader.set_power(power)
# print(response_set_power)
#########################################################
reader.close()
Video §
VIDEO