# Inventory Active Mode Reader EL-UHF-RC4-2 Serial RS232 dengan Raspberry Pi Pico
Menggunakan Raspberry Pi Pico kita dapat mengambil hasil inventory Reader EL-UHF-RC4-2 dari interface Serial RS232-nya menggunakan Max3232.
# Persiapan
Berikut beberapa barang yang diperlukan:
- Reader Electron EL-UHF-RC4-2
- Sudah di set work mode: Active Mode, output interface RS232
- Raspberry Pi Pico (MicroPython)
- Max3232
- Jumper & Breadboard
- Dokumentasi protokol (opens new window)
# Koneksi antar Pin
Adapun koneksi antar Pin seperti pada gambar berikut.
Max3232 | Rasberry Pi Pico |
---|---|
VCC | 3V3 (Out) |
TXD | GP0 (UART0 - TX) |
RXD | GP1 (UART0 - RX) |
GND | GND |
Kabel jumper dari TXD (Max3232) ➜ GP0 (Raspberry Pi Pico) bersifat optional, karena Raspberry Pi Pico tidak mengirim data apapun ke Reader, mode Active Mode akan mengirim terus menerus dari Reader ke Raspberry Pi Pico ketika terdeteksi adanya tag.
Dikarenakan pada tutorial kali ini menggunakan Max3232 Female, maka kita memerlukan kabel DB9 Serial RS232 Male to Male (serial 2-3 Cross Connect). Untuk terhubung ke Max3232 Female ➜ RS232 Female Reader.
# Kode dan Penjelasan
Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol.
Kode dibawah ini dipisah menjadi 4 file:
- response.py (opens new window): proses parsing penerimaan byte dari reader.
- utils.py (opens new window): fungsi umum yang digunakan.
- main.py (opens new window): file yang akan dijalankan Raspberry Pi Pico.
Jangan lupa upload dulu response.py
dan utils.py
ke Raspberry Pi Pico sebelum jalankan main.py
.
# 1. Response (response.py
)
from utils import hex_readable, calculate_checksum, calculate_rssi
class Tag:
def __init__(self, rssi: bytes, antenna: int, channel: int, data: bytes) -> None:
self.rssi: bytes = rssi
self.antenna: int = antenna
self.channel: int = channel
self.data: bytes = data
def __str__(self) -> str:
return f'Tag(RSSI: {str(calculate_rssi(self.rssi))[0:3]}, data: {hex_readable(self.data)})'
class Response:
def __init__(self, response: bytes) -> None:
self.tag: Tag | None = None
header_section: bytes = response[0:5]
assert header_section[0] == 0xCF # Header must 0xCF
length: int = response[4]
self.command: bytes = header_section[2:4]
body_n_checksum_section: bytes = response[5: 4 + length + 2 + 1] # length(N) + 2(checksum) + 1 (end of index)
self.status: int = body_n_checksum_section[0]
assert self.status == 0x00 # 0x00: Success (refer to Documentation)
body_section: bytes = body_n_checksum_section[1:-2]
checksum: bytes = response[-2:]
# Verify checksum
verify_checksum: bytearray = bytearray(header_section)
verify_checksum.extend(bytearray([body_n_checksum_section[0]]))
verify_checksum.extend(body_section)
crc_msb, crc_lsb = calculate_checksum(verify_checksum)
assert checksum[0] == crc_msb and checksum[1] == crc_lsb
tag_length: int = body_section[4]
tag_data: bytes = body_section[5: tag_length + 5]
assert(tag_length == len(tag_data))
self.tag = Tag(rssi=body_section[0:2], antenna=body_section[2], channel=body_section[3], data=body_section[5:])
def __str__(self) -> str:
return f'Response(status: {self.status}, tag: {self.tag})'
# 2. Utility (utils.py
)
def hex_readable(data_bytes: bytes, separator: str = " ") -> str:
return separator.join('{:02X}'.format(x) for x in data_bytes)
def calculate_checksum(data: bytes) -> bytearray:
value = 0xFFFF
for d in data:
value ^= d
for _ in range(8):
value = (value >> 1) ^ 0x8408 if value & 0x0001 else (value >> 1)
crc_msb = value >> 0x08
crc_lsb = value & 0xFF
return bytearray([crc_msb, crc_lsb])
def calculate_rssi(rssi: bytes) -> int: # Python: `int.from_bytes(rssi, "big", signed=True)`
length: int = len(rssi)
if not length:
return 0
if rssi[0] & 0x80:
complemented = bytes(0xFF - x for x in rssi)
return -(int.from_bytes(complemented, 'big') + 1)
else:
return int.from_bytes(rssi, 'big')
# 3. main.py
from machine import Pin, UART
from time import sleep
from response import Response
#pin_tx: Pin = Pin(0) # uart: UART = UART(0, baudrate=115_200, tx=pin_tx, rx=pin_rx)
pin_rx: Pin = Pin(1)
uart: UART = UART(0, baudrate=115_200, rx=pin_rx)
while True:
if not uart.any():
continue
# Get first 5 bytes. Because the length of data is in the 5th index.
header_response: bytes = uart.read(5)
if len(header_response) < 5:
continue
body_length: int = header_response[-1]
# + 2 for the checksum (2 bytes)
body_response: bytes = uart.read(body_length + 2)
raw_response: bytes = header_response + body_response
response: Response = Response(raw_response)
print(f'Response -> {response}')
# Hasil setelah main.py
dijalankan
Response -> Response(status: 0, tag: Tag(RSSI: -38, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -37, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -40, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -42, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
# Video
Cara atur work mode, dapat dilihat di video berikut:
# UPDATE: Raspberry Pi Pico W - Inventory TCP/IP (Ethernet)
Port Ethernet reader terhubung ke router. Raspberry Pi Pico W terkoneksi via Wi-Fi.
# 3. main.py
from network import WLAN, STA_IF
from socket import socket, AF_INET, SOCK_STREAM
from time import sleep
from response import Response
SSID: str = "MY_SSID"
PASSWORD: str = "MY_PASSWORD"
READER_IP_ADDRESS: str = "192.168.1.215"
READER_PORT: int = 2022
def connect() -> str:
wlan: WLAN = WLAN(STA_IF)
wlan.active(True)
wlan.connect(SSID, PASSWORD)
while wlan.isconnected() == False:
print('WLAN >> Waiting for connection...')
sleep(1)
ip_address: str = wlan.ifconfig()[0]
print(wlan.ifconfig())
return ip_address
ip_address: str = connect()
socket = socket(AF_INET, SOCK_STREAM)
socket.connect((READER_IP_ADDRESS, READER_PORT))
while True:
# Get first 5 bytes. Because the length of data is in the 5th index.
header_response: bytes = socket.recv(5)
if len(header_response) < 5:
continue
body_length: int = header_response[-1]
# + 2 for the checksum (2 bytes)
body_response: bytes = socket.recv(body_length + 2)
raw_response: bytes = header_response + body_response
response: Response = Response(raw_response)
print(f'Response -> {response}')