# Komunikasi HW-VX6330K Menggunakan Python

HW-VX6330K adalah salah satu UHF reader middle range yang dapat melakukan read maupun write terhadap tag. Komunikasi dengan reader dapat menggunakan bahasa pemrograman apapun asalkan komunikasi sudah sesuai dengan protokol yang ada.

Untuk detail produk dapat klik link berikut ini Electron HW-VX6330K (opens new window).

Pada artikel ini kami mencoba komunikasikan reader menggunakan bahasa pemrograman Python menggunakan protokol (tidak menggunakan SDK), koneksi via Serial atau TCP/IP.

# Persiapan

Sebelum masuk ke bagian kode, perlu disiapkan beberapa hal berikut:

Konverter dapat dibeli di link berikut USB to RS232 CH340 Serial (opens new window).

# Hubungkan Reader ke Komputer

# [Serial] Konverter RS232 to Serial

Proses menghubungkan HW-VX6330K dengan komputer cukup sambungkan dengan menggunakan kabel konverter RS232 to USB Serial.

Konverter RS-232 to USB Serial

Setelah terhubung pastikan sudah terdeteksi adanya serial port di Device Manager.

Port Serial Terdeteksi

# [TCP/IP] Kabel Ethernet

Untuk menghubungkan HW-VX6330K dengan komputer menggunakan kabel ethernet, kita perlu samakan dulu IP Network komputer terhadap reader.

Hubungkan port ethernet

Default IP Address reader adalah 192.168.1.192/24 dengan port 6000.

Bagi pengguna Windows, bisa di atur di Control Panel > Network and Sharing Center > Pilih adapter lalu sesuaikan IP Network-nya.

Contoh: IP reader 192.168.1.192/24, IP komputer: 192.168.1.1/24.

# Kode dan Penjelasan

Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol (opens new window).

Kode dibawah ini dipisah menjadi 4 file:

  • transport.py: mengatur penerimaan dan pengiriman byte dari/ke reader.
  • command.py: proses parsing pengiriman byte ke reader.
  • response.py: proses parsing penerimaan byte dari reader.
  • reader.py: menggabungkan 3 class diatas.
  • main.py: file yang akan dijalankan Python.

# 1. Transport (transport.py)

Class ini yang mengatur pengiriman dan penerimaan byte dari/ke reader. Pada contoh, kita akan coba buat 1 base class (dasar), 2 derived class (turunan) yakni untuk Serial dan TCP/IP.

from abc import ABC, abstractmethod
from socket import socket, AF_INET, SOCK_STREAM
from typing import TypeVar
import serial

T = TypeVar('T', bound='Parent')


class Transport(ABC):
    @abstractmethod
    def read_bytes(self, length: int) -> bytes:
        raise NotImplementedError

    @abstractmethod
    def write_bytes(self, buffer: bytes) -> None:
        raise NotImplementedError

    def read_frame(self) -> bytes:
        length_bytes = self.read_bytes(1)
        frame_length = ord(chr(length_bytes[0]))
        data = length_bytes + self.read_bytes(frame_length)
        return bytearray(data)

    @abstractmethod
    def close(self) -> None:
        raise NotImplementedError


class TcpTransport(Transport):
    def __init__(self, ip_address: str, port: int, timeout: int = 1) -> None:
        self.socket = socket(AF_INET, SOCK_STREAM)
        self.socket.settimeout(timeout)
        self.socket.connect((ip_address, port))

    def read_bytes(self, length: int) -> bytes:
        return self.socket.recv(length)

    def write_bytes(self, buffer: bytes) -> None:
        self.socket.sendall(buffer)

    def close(self) -> None:
        self.socket.close()


class SerialTransport(Transport):
    def __init__(self, serial_port: str, baud_rate: int, timeout: int = 1) -> None:
        self.serial = serial.Serial(serial_port, baud_rate,
                                    timeout=timeout, write_timeout=timeout)

    def read_bytes(self, length: int) -> bytes:
        return self.serial.read(length)

    def write_bytes(self, buffer: bytes) -> None:
        self.serial.write(buffer)

    def close(self) -> None:
        self.serial.close()

# 2. Command (command.py)

Class ini akan mempermudah proses parsing data byte yang akan dikirim ke reader. List command yang dapat digunakan dapat dilihat di dokumentasi protokol (opens new window), pada bagian 4.1 EPC C1 G2(ISO18000-6C)COMMAND.

Pada class ini terdapat kode untuk menghitung algoritma checksum, algoritma checksum yang digunakan pada reader HW-VX6330K adalah CRC-16/MCRF4XX.

CMD_INVENTORY = 0x01
CMD_READ_MEMORY = 0x02
CMD_WRITE_MEMORY = 0x03
CMD_READER_INFORMATION = 0x21
CMD_SET_READER_POWER = 0x2F


class Command:
    def __init__(self, command: int, reader_address: int = 0xFF,
                 data: bytes | int | None = None):
        self.command = command
        self.reader_address = reader_address
        self.data = data
        if isinstance(data, int):
            self.data = bytearray([data])
        if data is None:
            self.data = bytearray()
        self.frame_length = 4 + len(self.data)
        self.base_data = bytearray([self.frame_length, self.reader_address, self.command])
        self.base_data.extend(self.data)

    def serialize(self) -> bytes:
        serialize = self.base_data

        # Checksum CRC-16/MCRF4XX
        value = 0xFFFF
        for d in serialize:
            value ^= d
            for _ in range(8):
                value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
        crc_msb = value >> 0x08
        crc_lsb = value & 0xFF

        serialize = serialize + bytes([crc_lsb])
        serialize = serialize + bytes([crc_msb])
        return serialize

# 3. Response (response.py)

class Response:
    def __init__(self, response_bytes: bytes):
        self.response_bytes = response_bytes
        self.length = response_bytes[0]
        self.reader_address = response_bytes[1]
        self.command = response_bytes[2]
        self.status = response_bytes[3]  # Check 5. LIST OF COMMAND EXECUTION RESULT STATUS
        self.data = response_bytes[4:-2]
        self.checksum = response_bytes[-2:]

    def __str__(self) -> str:
        return_value = ''
        value = '>>> START RESPONSE ================================'
        return_value = f'{return_value}\n{value}'
        value = f'RESPONSE       >> {hex_readable(self.response_bytes)}'  # Response
        return_value = f'{return_value}\n{value}'
        value = f'READER ADDRESS >> {hex_readable(self.reader_address)}'  # Reader Address
        return_value = f'{return_value}\n{value}'
        value = f'COMMAND        >> {hex_readable(self.command)}'  # Command
        return_value = f'{return_value}\n{value}'
        value = f'STATUS         >> {hex_readable(self.status)}'  # Status
        return_value = f'{return_value}\n{value}'
        if self.data:
            value = f'DATA           >> {hex_readable(self.data)}'  # Data
            return_value = f'{return_value}\n{value}'
        value = f'CHECKSUM       >> {hex_readable(self.checksum)}'  # Checksum
        return_value = f'{return_value}\n{value}'
        value = '>>> END RESPONSE   ================================'
        return_value = f'{return_value}\n{value}'
        return return_value.strip()


def hex_readable(data: bytes | int, bytes_separator: str = " ") -> str:
    if isinstance(data, int):
        return "{:02X}".format(data)
    return bytes_separator.join("{:02X}".format(x) for x in data)

# 4. Reader (reader.py)

Pada class Reader berikut yang kami demokan adalah proses:

  • Inventory (Answer Mode)

    Jangan lupa di set work mode: Answer Mode pada program demo

  • Read memory (read memory bank TID)
  • Write memory (write memory bank EPC)
  • Set power (jarak baca reader terhadap tag)

Perbedaan antar memory bank dapat dibaca di artikel berikut Memory Bank EPC UHF Gen2 Air Interface Protocol (opens new window).

from typing import Iterator
from transport import Transport
from command import *
from response import *


class Reader:
    def __init__(self, transport: Transport) -> None:
        self.transport = transport

    def close(self) -> None:
        self.transport.close()

    def __send_request(self, command: Command) -> None:
        self.transport.write_bytes(command.serialize())

    def __get_response(self) -> bytes:
        return self.transport.read_frame()

    def inventory(self) -> Iterator[bytes]:  # 8.2.1 Inventory (Answer Mode)
        command = Command(CMD_INVENTORY)
        self.__send_request(command)

        response = Response(self.__get_response())
        tag_count = response.data[0]
        data = response.data

        n = 0
        pointer = 1
        while n < tag_count:
            tag_len = int(data[pointer])
            tag_data_start = pointer + 1
            tag_main_start = tag_data_start
            tag_main_end = tag_main_start + tag_len
            next_tag_start = tag_main_end
            tag = data[tag_data_start:tag_main_start] \
                  + data[tag_main_start:tag_main_end] + data[tag_main_end:next_tag_start]
            yield tag
            pointer = next_tag_start
            n += 1

    def read_memory(self, epc: bytes, memory_bank: int, start_address: int, length: int,
                    access_password: bytes = bytes(4)) -> Response:  # 8.2.2 Read Data
        request_data = bytearray()
        request_data.extend(bytearray([int(len(epc) / 2)]))  # EPC Length in word
        request_data.extend(epc)
        request_data.extend(bytearray([memory_bank, start_address, length]))
        request_data.extend(access_password)
        command = Command(CMD_READ_MEMORY, data=request_data)
        self.__send_request(command)

        return Response(self.__get_response())

    def write_memory(self, epc: bytes, memory_bank: int, start_address: int,
                     data_to_write: bytes,
                     access_password: bytes = bytes(4)) -> Response:  # 8.2.4 Write Data
        request_data = bytearray()
        request_data.extend(bytearray([int(len(data_to_write) / 2)]))  # Data length in word
        request_data.extend(bytearray([int(len(epc) / 2)]))  # EPC Length in word
        request_data.extend(epc)
        request_data.extend(bytearray([memory_bank, start_address]))
        request_data.extend(data_to_write)
        request_data.extend(access_password)
        command = Command(CMD_WRITE_MEMORY, data=request_data)
        self.__send_request(command)

        return Response(self.__get_response())

    def set_power(self, power: int) -> Response:  # 8.4.6 Set Power
        assert 0 <= power <= 30

        command = Command(CMD_SET_READER_POWER, data=bytearray([power]))
        self.__send_request(command)

        return Response(self.__get_response())

Pada fungsi inventory() response blok data berisi beberapa EPC sekaligus, byte pertama (response.data[0]) yang menentukan berapa jumlah tag yang terdapat disekitar reader.

# 5. main.py

File ini yang akan dijalankan pertama kali python main.py, kita akan memanggil class-class yang sudah dibuat. Jika koneksi via TCP/IP, Anda dapat menggunakan class TcpTransport() sebagai parameter ke Reader.

from response import hex_readable
from transport import SerialTransport, TcpTransport
from reader import Reader


transport = SerialTransport('COM6', 57600)
# transport = TcpTransport('192.168.1.192', 6000)
reader = Reader(transport)

#########################################################

# 1. Inventory - Answer Mode
tags = reader.inventory()
for epc in tags:
    print(f'EPC: {hex_readable(epc)}')

#########################################################

# 2. Read specific memory, read memory bank TID (0x02), please check 8.2.2 Read Data
# EPC: 12 34 56 78 90 12 34 56 AD EF 01 23
epc = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
response_read = reader.read_memory(epc=epc, memory_bank=0x02, start_address=0x00, length=0x07)
print(response_read)

#########################################################

# 3. Write specific memory, write memory bank EPC (0x01), please check 8.2.3 Write Data
epc = bytearray([0x12, 0x34, 0x56, 0x78, 0x90, 0x12, 0x34, 0x56, 0xAD, 0xEF, 0x01, 0x23])
data_to_write = bytearray([0x99, 0x88, 0x77, 0x66, 0x55, 0x44, 0x33, 0x22, 0x11, 0x00, 0x99, 0x88])
response_write = reader.write_memory(epc=epc, memory_bank=0x01, start_address=0x02, data_to_write=data_to_write)
print(response_write)

#########################################################

# 4. Set power, please check 8.4.6 Set Power
power = 28
response_set_power = reader.set_power(power)
print(response_set_power)

#########################################################

reader.close()

# Informasi lainnya

Last Updated: 2023-09-27