Menggunakan Raspberry Pi Pico kita dapat mengambil hasil inventory Reader EL-UHF-RC4-2 dari interface Serial RS232-nya menggunakan Max3232.

Persiapan

Berikut beberapa barang yang diperlukan:

Koneksi antar Pin

Adapun koneksi antar Pin seperti pada gambar berikut.

Max3232Rasberry Pi Pico
VCC3V3 (Out)
TXDGP0 (UART0 - TX)
RXDGP1 (UART0 - RX)
GNDGND

Kabel jumper dari TXD (Max3232) ➜ GP0 (Raspberry Pi Pico) bersifat optional, karena Raspberry Pi Pico tidak mengirim data apapun ke Reader, mode Active Mode akan mengirim terus menerus dari Reader ke Raspberry Pi Pico ketika terdeteksi adanya tag.

Dikarenakan pada tutorial kali ini menggunakan Max3232 Female, maka kita memerlukan kabel DB9 Serial RS232 Male to Male (serial 2-3 Cross Connect). Untuk terhubung ke Max3232 Female ➜ RS232 Female Reader.

Kode dan Penjelasan

Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol.

Kode dibawah ini dipisah menjadi 4 file:

  • response.py: proses parsing penerimaan byte dari reader.
  • utils.py: fungsi umum yang digunakan.
  • main.py: file yang akan dijalankan Raspberry Pi Pico.

Jangan lupa upload dulu response.py dan utils.py ke Raspberry Pi Pico sebelum jalankan main.py.

1. Response (response.py)

from utils import hex_readable, calculate_checksum, calculate_rssi
 
 
class Tag:
    def __init__(self, rssi: bytes, antenna: int, channel: int, data: bytes) ->
		    None:
        self.rssi: bytes = rssi
        self.antenna: int = antenna
        self.channel: int = channel
        self.data: bytes = data
 
    def __str__(self) -> str:
        return f'Tag(RSSI: {str(calculate_rssi(self.rssi))[0:3]}, 
	        data: {hex_readable(self.data)})'
 
 
class Response:
    def __init__(self, response: bytes) -> None:
        self.tag: Tag | None = None
 
        header_section: bytes = response[0:5]
        assert header_section[0] == 0xCF  # Header must 0xCF
        length: int = response[4]
        self.command: bytes = header_section[2:4]
        body_n_checksum_section: bytes = response[5: 4 + length + 2 + 1]  
        # length(N) + 2(checksum) + 1 (end of index)
        
        self.status: int = body_n_checksum_section[0]
        assert self.status == 0x00  # 0x00: Success (refer to Documentation)
 
        body_section: bytes = body_n_checksum_section[1:-2]
        checksum: bytes = response[-2:]
 
        # Verify checksum
        verify_checksum: bytearray = bytearray(header_section)
        verify_checksum.extend(bytearray([body_n_checksum_section[0]]))
        verify_checksum.extend(body_section)
        crc_msb, crc_lsb = calculate_checksum(verify_checksum)
        assert checksum[0] == crc_msb and checksum[1] == crc_lsb
 
        tag_length: int = body_section[4]
        tag_data: bytes = body_section[5: tag_length + 5]
        assert(tag_length == len(tag_data))
 
        self.tag = Tag(rssi=body_section[0:2], antenna=body_section[2], 
	        channel=body_section[3], data=body_section[5:])
 
    def __str__(self) -> str:
        return f'Response(status: {self.status}, tag: {self.tag})'

2. Utility (utils.py)

def hex_readable(data_bytes: bytes, separator: str = " ") -> str:
    return separator.join('{:02X}'.format(x) for x in data_bytes)
 
 
def calculate_checksum(data: bytes) -> bytearray:
    value = 0xFFFF
    for d in data:
        value ^= d
        for _ in range(8):
            value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
    crc_msb = value >> 0x08
    crc_lsb = value & 0xFF
    return bytearray([crc_msb, crc_lsb])
 
 
def calculate_rssi(rssi: bytes) -> int:  # Python: `int.from_bytes(rssi, "big", signed=True)`
    length: int = len(rssi)
    if not length:
        return 0
 
    if rssi[0] & 0x80:
        complemented = bytes(0xFF - x for x in rssi)
        return -(int.from_bytes(complemented, 'big') + 1)
    else:
        return int.from_bytes(rssi, 'big')

3. main.py

from machine import Pin, UART
from time import sleep
from response import Response
 
 
#pin_tx: Pin = Pin(0)  # uart: UART = UART(0, baudrate=115_200, tx=pin_tx, rx=pin_rx)
pin_rx: Pin = Pin(1)
uart: UART = UART(0, baudrate=115_200, rx=pin_rx)
 
 
while True:
    if not uart.any():
        continue
 
    # Get first 5 bytes. Because the length of data is in the 5th index.
    header_response: bytes = uart.read(5)
 
    if len(header_response) < 5:
        continue
 
    body_length: int = header_response[-1]
    # + 2 for the checksum (2 bytes)
    body_response: bytes = uart.read(body_length + 2)
 
    raw_response: bytes = header_response + body_response
    response: Response = Response(raw_response)
 
    print(f'Response -> {response}')

Hasil setelah main.py dijalankan

Response -> Response(status: 0, tag: Tag(RSSI: -38, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -37, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -40, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -42, data: 12 34 56 78 90 12 34 56 78 90 A0 01))

Video

Cara atur work mode, dapat dilihat di video berikut:

UPDATE: Raspberry Pi Pico W - Inventory TCP/IP (Ethernet)

Port Ethernet reader terhubung ke router. Raspberry Pi Pico W terkoneksi via Wi-Fi.

3. main.py

from network import WLAN, STA_IF
from socket import socket, AF_INET, SOCK_STREAM
from time import sleep
from response import Response
 
 
SSID: str = "MY_SSID"
PASSWORD: str = "MY_PASSWORD"
READER_IP_ADDRESS: str = "192.168.1.215"
READER_PORT: int = 2022
 
 
def connect() -> str:
    wlan: WLAN = WLAN(STA_IF)
    wlan.active(True)
    wlan.connect(SSID, PASSWORD)
    while wlan.isconnected() == False:
        print('WLAN >> Waiting for connection...')
        sleep(1)
    ip_address: str = wlan.ifconfig()[0]
    print(wlan.ifconfig())
    return ip_address
 
 
ip_address: str = connect()
socket = socket(AF_INET, SOCK_STREAM)
socket.connect((READER_IP_ADDRESS, READER_PORT))
 
 
while True:
    # Get first 5 bytes. Because the length of data is in the 5th index.
    header_response: bytes = socket.recv(5)
 
    if len(header_response) < 5:
        continue
 
    body_length: int = header_response[-1]
    # + 2 for the checksum (2 bytes)
    body_response: bytes = socket.recv(body_length + 2)
 
    raw_response: bytes = header_response + body_response
    response: Response = Response(raw_response)
 
    print(f'Response -> {response}')