Menggunakan Raspberry Pi Pico kita dapat mengambil hasil inventory Reader EL-UHF-RC4-2 dari interface Serial RS232-nya menggunakan Max3232.
Persiapan §
Berikut beberapa barang yang diperlukan:
Koneksi antar Pin §
Adapun koneksi antar Pin seperti pada gambar berikut.
Max3232 Rasberry Pi Pico VCC 3V3 (Out) TXD GP0 (UART0 - TX) RXD GP1 (UART0 - RX) GND GND
Kabel jumper dari TXD (Max3232) ➜ GP0 (Raspberry Pi Pico) bersifat optional, karena Raspberry Pi Pico tidak mengirim data apapun ke Reader, mode Active Mode akan mengirim terus menerus dari Reader ke Raspberry Pi Pico ketika terdeteksi adanya tag.
Dikarenakan pada tutorial kali ini menggunakan Max3232 Female, maka kita memerlukan kabel DB9 Serial RS232 Male to Male (serial 2-3 Cross Connect) . Untuk terhubung ke Max3232 Female ➜ RS232 Female Reader .
Kode dan Penjelasan §
Penjelasan blok/tiap byte command/response dapat dilihat di dokumentasi protokol.
Kode dibawah ini dipisah menjadi 4 file:
response.py: proses parsing penerimaan byte dari reader.
utils.py: fungsi umum yang digunakan.
main.py: file yang akan dijalankan Raspberry Pi Pico.
Jangan lupa upload dulu response.py
dan utils.py
ke Raspberry Pi Pico sebelum jalankan main.py
.
1. Response (response.py
) §
from utils import hex_readable, calculate_checksum, calculate_rssi
class Tag :
def __init__ (self, rssi: bytes , antenna: int , channel: int , data: bytes ) ->
None :
self .rssi: bytes = rssi
self .antenna: int = antenna
self .channel: int = channel
self .data: bytes = data
def __str__ (self) -> str :
return f 'Tag(RSSI: {str (calculate_rssi( self .rssi))[ 0 : 3 ] } ,
data: { hex_readable( self .data) } )'
class Response :
def __init__ (self, response: bytes ) -> None :
self .tag: Tag | None = None
header_section: bytes = response[ 0 : 5 ]
assert header_section[ 0 ] == 0x CF # Header must 0xCF
length: int = response[ 4 ]
self .command: bytes = header_section[ 2 : 4 ]
body_n_checksum_section: bytes = response[ 5 : 4 + length + 2 + 1 ]
# length(N) + 2(checksum) + 1 (end of index)
self .status: int = body_n_checksum_section[ 0 ]
assert self .status == 0x 00 # 0x00: Success (refer to Documentation)
body_section: bytes = body_n_checksum_section[ 1 : - 2 ]
checksum: bytes = response[ - 2 :]
# Verify checksum
verify_checksum: bytearray = bytearray (header_section)
verify_checksum.extend( bytearray ([body_n_checksum_section[ 0 ]]))
verify_checksum.extend(body_section)
crc_msb, crc_lsb = calculate_checksum(verify_checksum)
assert checksum[ 0 ] == crc_msb and checksum[ 1 ] == crc_lsb
tag_length: int = body_section[ 4 ]
tag_data: bytes = body_section[ 5 : tag_length + 5 ]
assert (tag_length == len (tag_data))
self .tag = Tag( rssi = body_section[ 0 : 2 ], antenna = body_section[ 2 ],
channel = body_section[ 3 ], data = body_section[ 5 :])
def __str__ (self) -> str :
return f 'Response(status: {self .status } , tag: {self .tag } )'
2. Utility (utils.py
) §
def hex_readable (data_bytes: bytes , separator: str = " " ) -> str :
return separator.join( ' { :02X } ' .format(x) for x in data_bytes)
def calculate_checksum (data: bytes ) -> bytearray :
value = 0x FFFF
for d in data:
value ^= d
for _ in range ( 8 ):
value = (value >> 1 ) ^ 0x 8408 if value & 0x 0001 else (value >> 1 )
crc_msb = value >> 0x 08
crc_lsb = value & 0x FF
return bytearray ([crc_msb, crc_lsb])
def calculate_rssi (rssi: bytes ) -> int : # Python: `int.from_bytes(rssi, "big", signed=True)`
length: int = len (rssi)
if not length:
return 0
if rssi[ 0 ] & 0x 80 :
complemented = bytes ( 0x FF - x for x in rssi)
return - ( int .from_bytes(complemented, 'big' ) + 1 )
else :
return int .from_bytes(rssi, 'big' )
3. main.py
§
from machine import Pin, UART
from time import sleep
from response import Response
#pin_tx: Pin = Pin(0) # uart: UART = UART(0, baudrate=115_200, tx=pin_tx, rx=pin_rx)
pin_rx: Pin = Pin( 1 )
uart: UART = UART( 0 , baudrate = 115_200 , rx = pin_rx)
while True :
if not uart.any():
continue
# Get first 5 bytes. Because the length of data is in the 5th index.
header_response: bytes = uart.read( 5 )
if len (header_response) < 5 :
continue
body_length: int = header_response[ - 1 ]
# + 2 for the checksum (2 bytes)
body_response: bytes = uart.read(body_length + 2 )
raw_response: bytes = header_response + body_response
response: Response = Response(raw_response)
print ( f 'Response -> { response } ' )
Hasil setelah main.py
dijalankan §
Response -> Response(status: 0, tag: Tag(RSSI: -38, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -37, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -40, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Response -> Response(status: 0, tag: Tag(RSSI: -42, data: 12 34 56 78 90 12 34 56 78 90 A0 01))
Video §
VIDEO
Cara atur work mode, dapat dilihat di video berikut:
VIDEO
UPDATE: Raspberry Pi Pico W - Inventory TCP/IP (Ethernet) §
Port Ethernet reader terhubung ke router. Raspberry Pi Pico W terkoneksi via Wi-Fi.
3. main.py
§
from network import WLAN , STA_IF
from socket import socket, AF_INET , SOCK_STREAM
from time import sleep
from response import Response
SSID : str = "MY_SSID"
PASSWORD : str = "MY_PASSWORD"
READER_IP_ADDRESS : str = "192.168.1.215"
READER_PORT : int = 2022
def connect () -> str :
wlan: WLAN = WLAN( STA_IF )
wlan.active( True )
wlan.connect( SSID , PASSWORD )
while wlan.isconnected() == False :
print ( 'WLAN >> Waiting for connection...' )
sleep( 1 )
ip_address: str = wlan.ifconfig()[ 0 ]
print (wlan.ifconfig())
return ip_address
ip_address: str = connect()
socket = socket( AF_INET , SOCK_STREAM )
socket.connect(( READER_IP_ADDRESS , READER_PORT ))
while True :
# Get first 5 bytes. Because the length of data is in the 5th index.
header_response: bytes = socket.recv( 5 )
if len (header_response) < 5 :
continue
body_length: int = header_response[ - 1 ]
# + 2 for the checksum (2 bytes)
body_response: bytes = socket.recv(body_length + 2 )
raw_response: bytes = header_response + body_response
response: Response = Response(raw_response)
print ( f 'Response -> { response } ' )