# Inventory Active Mode Reader EL-UHF-RC4-2 Serial RS232 dengan Raspberry Pi Pico

Menggunakan Raspberry Pi Pico kita dapat mengambil hasil inventory Reader EL-UHF-RC4-2 dari interface Serial RS232-nya menggunakan Max3232.

# Persiapan

Berikut beberapa barang yang diperlukan:

# Koneksi antar Pin

Adapun koneksi antar Pin seperti pada gambar berikut.

Koneksi antar Pin

Max3232 Rasberry Pi Pico
VCC 3V3 (Out)
TXD GP0 (UART0 - TX)
RXD GP1 (UART0 - RX)
GND GND

Kabel jumper dari TXD (Max3232) ➜ GP0 (Raspberry Pi Pico) bersifat optional, karena Raspberry Pi Pico tidak mengirim data apapun ke Reader, mode Active Mode akan mengirim terus menerus dari Reader ke Raspberry Pi Pico ketika terdeteksi adanya tag.

Dikarenakan pada tutorial kali ini menggunakan Max3232 Female, maka kita memerlukan kabel DB9 Serial RS232 Male to Male (serial 2-3 Cross Connect). Untuk terhubung ke Max3232 Female ➜ RS232 Female Reader.

# Kode dan Penjelasan

Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol.

Kode dibawah ini dipisah menjadi 4 file:

Jangan lupa upload dulu response.py dan utils.py ke Raspberry Pi Pico sebelum jalankan main.py.

# 1. Response (response.py)

from utils import hex_readable, calculate_checksum, calculate_rssi


class Tag:
    def __init__(self, rssi: bytes, antenna: int, channel: int, data: bytes) -> None:
        self.rssi: bytes = rssi
        self.antenna: int = antenna
        self.channel: int = channel
        self.data: bytes = data

    def __str__(self) -> str:
        return f'Tag(RSSI: {str(calculate_rssi(self.rssi))[0:3]}, data: {hex_readable(self.data)})'


class Response:
    def __init__(self, response: bytes) -> None:
        self.tag: Tag | None = None

        header_section: bytes = response[0:5]
        assert header_section[0] == 0xCF  # Header must 0xCF
        length: int = response[4]
        self.command: bytes = header_section[2:4]
        body_n_checksum_section: bytes = response[5: 4 + length + 2 + 1]  # length(N) + 2(checksum) + 1 (end of index)
        self.status: int = body_n_checksum_section[0]
        assert self.status == 0x00  # 0x00: Success (refer to Documentation)

        body_section: bytes = body_n_checksum_section[1:-2]
        checksum: bytes = response[-2:]

        # Verify checksum
        verify_checksum: bytearray = bytearray(header_section)
        verify_checksum.extend(bytearray([body_n_checksum_section[0]]))
        verify_checksum.extend(body_section)
        crc_msb, crc_lsb = calculate_checksum(verify_checksum)
        assert checksum[0] == crc_msb and checksum[1] == crc_lsb

        tag_length: int = body_section[4]
        tag_data: bytes = body_section[5: tag_length + 5]
        assert(tag_length == len(tag_data))

        self.tag = Tag(rssi=body_section[0:2], antenna=body_section[2], channel=body_section[3], data=body_section[5:])

    def __str__(self) -> str:
        return f'Response(status: {self.status}, tag: {self.tag})'

# 2. Utility (utils.py)

def hex_readable(data_bytes: bytes, separator: str = " ") -> str:
    return separator.join('{:02X}'.format(x) for x in data_bytes)


def calculate_checksum(data: bytes) -> bytearray:
    value = 0xFFFF
    for d in data:
        value ^= d
        for _ in range(8):
            value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
    crc_msb = value >> 0x08
    crc_lsb = value & 0xFF
    return bytearray([crc_msb, crc_lsb])


def calculate_rssi(rssi: bytes) -> int:  # Python: `int.from_bytes(rssi, "big", signed=True)`
    length: int = len(rssi)
    if not length:
        return 0

    if rssi[0] & 0x80:
        complemented = bytes(0xFF - x for x in rssi)
        return -(int.from_bytes(complemented, 'big') + 1)
    else:
        return int.from_bytes(rssi, 'big')

# 3. main.py

from machine import Pin, UART
from time import sleep
from response import Response


#pin_tx: Pin = Pin(0)  # uart: UART = UART(0, baudrate=115_200, tx=pin_tx, rx=pin_rx)
pin_rx: Pin = Pin(1)
uart: UART = UART(0, baudrate=115_200, rx=pin_rx)


while True:
    if not uart.any():
        continue

    # Get first 5 bytes. Because the length of data is in the 5th index.
    header_response: bytes = uart.read(5)

    if len(header_response) < 5:
        continue

    body_length: int = header_response[-1]
    # + 2 for the checksum (2 bytes)
    body_response: bytes = uart.read(body_length + 2)

    raw_response: bytes = header_response + body_response
    response: Response = Response(raw_response)

    print(f'Response -> {response}')

# Hasil setelah main.py dijalankan

Response -> Response(status: 0, tag: Tag(RSSI: -38, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -37, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -40, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -42, data: 12 34 56 78 90 12 34 56 78 90 A0 01))

# Video


Cara atur work mode, dapat dilihat di video berikut:

# UPDATE: Raspberry Pi Pico W - Inventory TCP/IP (Ethernet)

Port Ethernet reader terhubung ke router. Raspberry Pi Pico W terkoneksi via Wi-Fi.

# 3. main.py

from network import WLAN, STA_IF
from socket import socket, AF_INET, SOCK_STREAM
from time import sleep
from response import Response


SSID: str = "MY_SSID"
PASSWORD: str = "MY_PASSWORD"
READER_IP_ADDRESS: str = "192.168.1.215"
READER_PORT: int = 2022


def connect() -> str:
    wlan: WLAN = WLAN(STA_IF)
    wlan.active(True)
    wlan.connect(SSID, PASSWORD)
    while wlan.isconnected() == False:
        print('WLAN >> Waiting for connection...')
        sleep(1)
    ip_address: str = wlan.ifconfig()[0]
    print(wlan.ifconfig())
    return ip_address


ip_address: str = connect()
socket = socket(AF_INET, SOCK_STREAM)
socket.connect((READER_IP_ADDRESS, READER_PORT))


while True:
    # Get first 5 bytes. Because the length of data is in the 5th index.
    header_response: bytes = socket.recv(5)

    if len(header_response) < 5:
        continue

    body_length: int = header_response[-1]
    # + 2 for the checksum (2 bytes)
    body_response: bytes = socket.recv(body_length + 2)

    raw_response: bytes = header_response + body_response
    response: Response = Response(raw_response)

    print(f'Response -> {response}')
Last Updated: 2023-10-23