HW-VX6330K adalah salah satu UHF reader middle range yang dapat melakukan read maupun write terhadap tag. Komunikasi dengan reader dapat menggunakan bahasa pemrograman apapun asalkan komunikasi sudah sesuai dengan protokol yang ada.

Pada artikel ini kami mencoba komunikasikan reader menggunakan bahasa pemrograman Python menggunakan protokol (tidak menggunakan SDK), koneksi via Serial atau TCP/IP.

Persiapan

Sebelum masuk ke bagian kode, perlu disiapkan beberapa hal berikut:

  • Dokumentasi protokol sebagai pedoman
  • Konverter RS232 to USB Serial / Kabel Ethernet
  • Python versi 3.11 ke atas
  • Library pyserial, digunakan untuk komunikasi Serial (RS232)
    • pip install pyserial

Konverter dapat dibeli di link berikut USB to RS232 CH340 Serial.

Hubungkan Reader ke Komputer

RS232

Proses menghubungkan HW-VX6330K dengan komputer cukup sambungkan dengan menggunakan kabel konverter RS232 to USB Serial.

Konverter RS-232 to USB Serial

Setelah terhubung pastikan sudah terdeteksi adanya serial port di Device Manager.

Port Serial Terdeteksi

TCP/IP

Untuk menghubungkan HW-VX6330K dengan komputer menggunakan kabel ethernet, kita perlu samakan dulu IP Network komputer terhadap reader.

Hubungkan port ethernet

Default IP Address reader adalah 192.168.1.192/24 dengan port 6000.

Bagi pengguna Windows, bisa di atur di Control Panel > Network and Sharing Center > Pilih adapter lalu sesuaikan IP Network-nya.

Contoh: IP reader 192.168.1.192/24, IP komputer: 192.168.1.1/24.

Kode dan Penjelasan

Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol.

Kode dibawah ini dipisah menjadi 4 file:

  • transport.py: mengatur penerimaan dan pengiriman byte dari/ke reader.
  • command.py: proses parsing pengiriman byte ke reader.
  • response.py: proses parsing penerimaan byte dari reader.
  • reader.py: menggabungkan 3 class diatas.
  • main.py: file yang akan dijalankan Python.

1. Transport (transport.py)

Class ini yang mengatur pengiriman dan penerimaan byte dari/ke reader. Pada contoh, kita akan coba buat 1 base class (dasar), 2 derived class (turunan) yakni untuk Serial dan TCP/IP.

from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial
 
T = TypeVar('T', bound='Parent')
 
 
class Transport(ABC):
    @abstractmethod
    def read_bytes(self, length: int) -> bytes:
        raise NotImplementedError
 
    @abstractmethod
    def write_bytes(self, buffer: bytes) -> None:
        raise NotImplementedError
 
    def read_frame(self) -> bytes | None:
        length_bytes = self.read_bytes(1)
        if not length_bytes:
            return
        frame_length = ord(chr(length_bytes[0]))
        data = length_bytes + self.read_bytes(frame_length)
        return bytearray(data)
 
    @abstractmethod
    def close(self) -> None:
        raise NotImplementedError
 
 
class TcpTransport(Transport):
    def __init__(self, ip_address: str, port: int, timeout: int = 1) -> None:
        self.socket = socket(AF_INET, SOCK_STREAM)
        self.socket.settimeout(timeout)
        self.socket.connect((ip_address, port))
 
    def read_bytes(self, length: int) -> bytes:
        return self.socket.recv(length)
 
    def write_bytes(self, buffer: bytes) -> None:
        self.socket.sendall(buffer)
 
    def close(self) -> None:
        self.socket.close()
 
 
class SerialTransport(Transport):
    def __init__(self, serial_port: str, baud_rate: int, timeout: int = 1) -> None:
        self.serial = serial.Serial(serial_port, baud_rate,
                                    timeout=timeout, write_timeout=timeout)
 
    def read_bytes(self, length: int) -> bytes:
        return self.serial.read(length)
 
    def write_bytes(self, buffer: bytes) -> None:
        self.serial.write(buffer)
 
    def close(self) -> None:
        self.serial.close()

2. Command (command.py)

Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol, pada bagian 4.1 EPC C1 G2(ISO18000-6C)COMMAND.

Pada class ini terdapat kode untuk menghitung algoritma checksum, algoritma checksum yang digunakan pada reader HW-VX6330K adalah CRC-16/MCRF4XX.

CMD_INVENTORY = 0x01
CMD_READ_MEMORY = 0x02
CMD_WRITE_MEMORY = 0x03
CMD_READER_INFORMATION = 0x21
CMD_SET_READER_POWER = 0x2F
 
 
class Command:
    def __init__(self, command: int, reader_address: int = 0xFF,
                 data: bytes | int | None = None):
        self.command = command
        self.reader_address = reader_address
        self.data = data
        if isinstance(data, int):
            self.data = bytearray([data])
        if data is None:
            self.data = bytearray()
        self.frame_length = 4 + len(self.data)
        self.base_data = bytearray([self.frame_length, self.reader_address, self.command])
        self.base_data.extend(self.data)
 
    def serialize(self) -> bytes:
        serialize = self.base_data
 
        # Checksum CRC-16/MCRF4XX
        value = 0xFFFF
        for d in serialize:
            value ^= d
            for _ in range(8):
                value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
        crc_msb = value >> 0x08
        crc_lsb = value & 0xFF
 
        serialize = serialize + bytes([crc_lsb])
        serialize = serialize + bytes([crc_msb])
        return serialize
 

3. Response (response.py)

class Response:
    def __init__(self, response_bytes: bytes):
        self.response_bytes = response_bytes
        self.length = response_bytes[0]
        self.reader_address = response_bytes[1]
        self.command = response_bytes[2]
        self.status = response_bytes[3]  # Check 5. LIST OF COMMAND EXECUTION RESULT STATUS
        self.data = response_bytes[4:-2]
        self.checksum = response_bytes[-2:]
 
    def __str__(self) -> str:
        return_value = ''
        value = '>>> START RESPONSE ================================'
        return_value = f'{return_value}\n{value}'
        value = f'RESPONSE       >> {hex_readable(self.response_bytes)}'  # Response
        return_value = f'{return_value}\n{value}'
        value = f'READER ADDRESS >> {hex_readable(self.reader_address)}'  # Reader Address
        return_value = f'{return_value}\n{value}'
        value = f'COMMAND        >> {hex_readable(self.command)}'  # Command
        return_value = f'{return_value}\n{value}'
        value = f'STATUS         >> {hex_readable(self.status)}'  # Status
        return_value = f'{return_value}\n{value}'
        if self.data:
            value = f'DATA           >> {hex_readable(self.data)}'  # Data
            return_value = f'{return_value}\n{value}'
        value = f'CHECKSUM       >> {hex_readable(self.checksum)}'  # Checksum
        return_value = f'{return_value}\n{value}'
        value = '>>> END RESPONSE   ================================'
        return_value = f'{return_value}\n{value}'
        return return_value.strip()
 
 
def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
    if isinstance(data, int):
        return "{:02X}".format(data)
    return bytes_separator.join("{:02X}".format(x) for x in data)

4. Reader (reader.py)

from typing import Iterator
from transport import Transport
from command import *
from response import *
 
 
class Reader:
    def __init__(self, transport: Transport) -> None:
        self.transport = transport
 
    def close(self) -> None:
        self.transport.close()
 
    def __send_request(self, command: Command) -> None:
        self.transport.write_bytes(command.serialize())
 
    def __get_response(self) -> bytes:
        return self.transport.read_frame()
 
    def inventory_answer_mode(self) -> Iterator[bytes]:  # 8.2.1 Inventory (Answer Mode)
        command: Command = Command(CMD_INVENTORY)
        self.__send_request(command)
 
        response: Response = Response(self.__get_response())
        tag_count: int = response.data[0]
        data: bytes = response.data
 
        n: int = 0
        pointer: int = 1
        while n < tag_count:
            tag_len = int(data[pointer])
            tag_data_start = pointer + 1
            tag_main_start = tag_data_start
            tag_main_end = tag_main_start + tag_len
            next_tag_start = tag_main_end
            tag = data[tag_data_start:tag_main_start] \
                  + data[tag_main_start:tag_main_end] + data[tag_main_end:next_tag_start]
            yield tag
            pointer = next_tag_start
            n += 1
 
    def inventory_active_mode(self) -> Iterator[Response]:
        while True:
            try:
                raw_response: bytes | None = self.__get_response()
            except TimeoutError:
                continue
            if raw_response is None:
                continue
            response: Response = Response(raw_response)
            yield response
 
    def read_memory(self, epc: bytes, memory_bank: int, start_address: int, length: int,
                    access_password: bytes = bytes(4)) -> Response:  # 8.2.2 Read Data
        request_data = bytearray()
        request_data.extend(bytearray([int(len(epc) / 2)]))  # EPC Length in word
        request_data.extend(epc)
        request_data.extend(bytearray([memory_bank, start_address, length]))
        request_data.extend(access_password)
        command: Command = Command(CMD_READ_MEMORY, data=request_data)
        self.__send_request(command)
 
        return Response(self.__get_response())
 
    def write_memory(self, epc: bytes, memory_bank: int, start_address: int,
                     data_to_write: bytes,
                     access_password: bytes = bytes(4)) -> Response:  # 8.2.4 Write Data
        request_data: bytearray = bytearray()
        request_data.extend(bytearray([int(len(data_to_write) / 2)]))  # Data length in word
        request_data.extend(bytearray([int(len(epc) / 2)]))  # EPC Length in word
        request_data.extend(epc)
        request_data.extend(bytearray([memory_bank, start_address]))
        request_data.extend(data_to_write)
        request_data.extend(access_password)
        command: Command = Command(CMD_WRITE_MEMORY, data=request_data)
        self.__send_request(command)
 
        return Response(self.__get_response())
 
    def set_power(self, power: int) -> Response:  # 8.4.6 Set Power
        assert 0 <= power <= 30
 
        command: Command = Command(CMD_SET_READER_POWER, data=bytearray([power]))
        self.__send_request(command)
 
        return Response(self.__get_response())

Pada class Reader terdapat fungsi:

  1. inventory_answer_mode(): Inventory (answer mode)
  2. inventory_active_mode(): Inventory (active mode)
  3. read_memory(...): Mengambil data pada memory bank tertentu
  4. write_memory(...): Menulis data pada memory bank tertentu
  5. set_power(...): Mengatur jarak baca reader terhadap tag (semakin besar semakin jauh)

Saat eksekusi fungsi read_memory(...), write_memory(...), dan set_power(...) disarankan atur inventory ke Answer Mode terlebih dahulu (lihat point no 5) agar tidak terjadi tabrakan data (data response dari reader terhalang dengan response active mode)

5. Setting reader (atur mode inventory)

Mode inventory dapat diatur pada program demo:

Inventory mode

Pada contoh di atas reader berjalan di mode Active Mode (Multi-Tag), kami rekomendasi menggunakan Multi-Tag tanpa Single Tag Filtering Time agar output tag yang keluar lebih cepat.

6. main.py

File ini yang akan dijalankan pertama kali python main.py, kita akan memanggil class-class yang sudah dibuat. Jika koneksi via TCP/IP, Anda dapat menggunakan class TcpTransport() sebagai parameter ke Reader.

Uncomment fungsi/kode yang ingin digunakan.

from typing import Iterator
 
from response import hex_readable, Response
from transport import SerialTransport, TcpTransport
from reader import Reader
 
transport = SerialTransport('COM6', 57600)
# transport = TcpTransport('192.168.1.192', 6000)
reader = Reader(transport)
 
#########################################################
 
# 1. Inventory - Answer Mode
# tags: Iterator[bytes] = reader.inventory_answer_mode()
# for epc in tags:
#     print(f'Tag: {hex_readable(tag)}')
 
#########################################################
 
# 2. Inventory - Active Mode
try:
    responses: Iterator[Response] = reader.inventory_active_mode()
    for response in responses:
        # print(response)
        tag: bytes = response.data
        print(f'Tag: {hex_readable(tag)}')
except KeyboardInterrupt:  # Handle Ctrl + C
    pass
 
#########################################################
 
# 3. Read specific memory, read memory bank TID (0x02), please check 8.2.2 Read Data
# EPC: 12 34 56 78 90 12 34 56 AD EF 01 23
# epc: bytearray = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
# response_read: Response = reader.read_memory(epc=epc, memory_bank=0x02, start_address=0x00, length=0x07)
# print(response_read)
 
#########################################################
 
# 4. Write specific memory, write memory bank EPC (0x01), please check 8.2.3 Write Data
# epc: bytearray = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
# data_to_write: bytearray = bytearray([0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00, 0x99, 0x88])
# response_write: Response = reader.write_memory(epc=epc, memory_bank=0x01, start_address=0x02, data_to_write=data_to_write)
# print(response_write)
 
#########################################################
 
# 5. Set power, please check 8.4.6 Set Power
# power: int = 28
# response_set_power: Response = reader.set_power(power)
# print(response_set_power)
 
#########################################################
 
reader.close()

Informasi lainnya

Video