V0.1
This commit is contained in:
@@ -0,0 +1,74 @@
|
|||||||
|
from machine import UART, Pin
|
||||||
|
import time
|
||||||
|
|
||||||
|
class CH9120:
|
||||||
|
def __init__(self, uart):
|
||||||
|
self.uart = uart
|
||||||
|
self.MODE = 1 #0:TCP Server 1:TCP Client 2:UDP Server 3:UDP Client
|
||||||
|
self.GATEWAY = (192, 168, 11, 1) # GATEWAY
|
||||||
|
self.TARGET_IP = (47, 92, 129, 18) # TARGET_IP
|
||||||
|
self.LOCAL_IP = (192, 168, 10, 200) # LOCAL_IP
|
||||||
|
self.SUBNET_MASK = (255,255,252,0) # SUBNET_MASK
|
||||||
|
self.LOCAL_PORT = 1000 # LOCAL_PORT1
|
||||||
|
self.TARGET_PORT = 1883 # TARGET_PORT
|
||||||
|
self.BAUD_RATE = 115200 # BAUD_RATE
|
||||||
|
self.CFG = Pin(18, Pin.OUT,Pin.PULL_UP)
|
||||||
|
self.RST = Pin(19, Pin.OUT,Pin.PULL_UP)
|
||||||
|
|
||||||
|
def enter_config(self):
|
||||||
|
print("begin")
|
||||||
|
self.RST.value(1)
|
||||||
|
self.CFG.value(0)
|
||||||
|
time.sleep(0.5)
|
||||||
|
|
||||||
|
def exit_config(self):
|
||||||
|
self.uart.write(b'\x57\xab\x0D')
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.uart.write(b'\x57\xab\x0E')
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.uart.write(b'\x57\xab\x5E')
|
||||||
|
time.sleep(0.1)
|
||||||
|
self.CFG.value(1)
|
||||||
|
time.sleep(0.1)
|
||||||
|
print("end")
|
||||||
|
|
||||||
|
def set_mode(self,MODE):
|
||||||
|
self.MODE = MODE
|
||||||
|
self.uart.write(b'\x57\xab\x10' + self.MODE.to_bytes(1, 'little'))#Convert int to bytes
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_localIP(self,LOCAL_IP):
|
||||||
|
self.LOCAL_IP = LOCAL_IP
|
||||||
|
self.uart.write(b'\x57\xab\x11' + bytes(self.LOCAL_IP))#Converts the int tuple to bytes
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_subnetMask(self,SUBNET_MASK):
|
||||||
|
self.SUBNET_MASK = SUBNET_MASK
|
||||||
|
self.uart.write(b'\x57\xab\x12' + bytes(self.SUBNET_MASK))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_gateway(self,GATEWAY):
|
||||||
|
self.GATEWAY = GATEWAY
|
||||||
|
self.uart.write(b'\x57\xab\x13' + bytes(self.GATEWAY))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_localPort(self,LOCAL_PORT):
|
||||||
|
self.LOCAL_PORT = LOCAL_PORT
|
||||||
|
self.uart.write(b'\x57\xab\x14' + self.LOCAL_PORT.to_bytes(2, 'little'))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_targetIP(self,TARGET_IP):
|
||||||
|
self.TARGET_IP = TARGET_IP
|
||||||
|
self.uart.write(b'\x57\xab\x15' + bytes(self.TARGET_IP))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_targetPort(self,TARGET_PORT):
|
||||||
|
self.TARGET_PORT = TARGET_PORT
|
||||||
|
self.uart.write(b'\x57\xab\x16' + self.TARGET_PORT.to_bytes(2, 'little'))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
def set_baudRate(self,BAUD_RATE):
|
||||||
|
self.BAUD_RATE = BAUD_RATE
|
||||||
|
self.uart.write(b'\x57\xab\x21' + self.BAUD_RATE.to_bytes(4, 'little'))
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
@@ -0,0 +1,5 @@
|
|||||||
|
{
|
||||||
|
"ip": "10.10.201.32",
|
||||||
|
"subnet": "255.255.248.0",
|
||||||
|
"gateway": "10.10.201.1"
|
||||||
|
}
|
||||||
@@ -0,0 +1,156 @@
|
|||||||
|
import time
|
||||||
|
import rp2
|
||||||
|
from machine import Pin, UART
|
||||||
|
from ch9120 import CH9120
|
||||||
|
import ujson
|
||||||
|
import math
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# --- CH9120 Network Configuration ---
|
||||||
|
MODE = 0
|
||||||
|
LOCAL_PORT1 = 80
|
||||||
|
BAUD_RATE = 9600
|
||||||
|
|
||||||
|
# --- Init UART ---
|
||||||
|
uart1 = UART(1, baudrate=9600, tx=Pin(20), rx=Pin(21))
|
||||||
|
|
||||||
|
# --- WS2812 RGB LED Control ---
|
||||||
|
@rp2.asm_pio(sideset_init=rp2.PIO.OUT_LOW, out_shiftdir=rp2.PIO.SHIFT_LEFT, autopull=True, pull_thresh=24)
|
||||||
|
def ws2812():
|
||||||
|
T1 = 2
|
||||||
|
T2 = 5
|
||||||
|
T3 = 3
|
||||||
|
wrap_target()
|
||||||
|
label("bitloop")
|
||||||
|
out(x, 1) .side(0) [T3 - 1]
|
||||||
|
jmp(not_x, "do_zero") .side(1) [T1 - 1]
|
||||||
|
jmp("bitloop") .side(1) [T2 - 1]
|
||||||
|
label("do_zero")
|
||||||
|
nop() .side(0) [T2 - 1]
|
||||||
|
wrap()
|
||||||
|
|
||||||
|
sm = rp2.StateMachine(0, ws2812, freq=8_000_000, sideset_base=Pin(25))
|
||||||
|
sm.active(1)
|
||||||
|
|
||||||
|
def hsv_to_rgb(h, s, v):
|
||||||
|
"""Convert HSV to RGB (each in range 0-255)."""
|
||||||
|
h = float(h)
|
||||||
|
s = float(s) / 255
|
||||||
|
v = float(v) / 255
|
||||||
|
c = v * s
|
||||||
|
x = c * (1 - abs((h / 60.0) % 2 - 1))
|
||||||
|
m = v - c
|
||||||
|
|
||||||
|
if h < 60:
|
||||||
|
rp, gp, bp = c, x, 0
|
||||||
|
elif h < 120:
|
||||||
|
rp, gp, bp = x, c, 0
|
||||||
|
elif h < 180:
|
||||||
|
rp, gp, bp = 0, c, x
|
||||||
|
elif h < 240:
|
||||||
|
rp, gp, bp = 0, x, c
|
||||||
|
elif h < 300:
|
||||||
|
rp, gp, bp = x, 0, c
|
||||||
|
else:
|
||||||
|
rp, gp, bp = c, 0, x
|
||||||
|
|
||||||
|
r = int((rp + m) * 255)
|
||||||
|
g = int((gp + m) * 255)
|
||||||
|
b = int((bp + m) * 255)
|
||||||
|
return r, g, b
|
||||||
|
|
||||||
|
def load_network_config():
|
||||||
|
try:
|
||||||
|
with open("config.json", "r") as f:
|
||||||
|
config = ujson.load(f)
|
||||||
|
return (
|
||||||
|
tuple(map(int, config["ip"].split("."))),
|
||||||
|
tuple(map(int, config["gateway"].split("."))),
|
||||||
|
tuple(map(int, config["subnet"].split(".")))
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
print("Failed to load config, using default:", e)
|
||||||
|
return (
|
||||||
|
(192, 168, 0, 45),
|
||||||
|
(192, 168, 0, 1),
|
||||||
|
(255, 255, 255, 0)
|
||||||
|
)
|
||||||
|
|
||||||
|
LOCAL_IP, GATEWAY, SUBNET_MASK = load_network_config()
|
||||||
|
print("Current IP:")
|
||||||
|
print(LOCAL_IP)
|
||||||
|
|
||||||
|
def save_network_config(ip, gateway, subnet):
|
||||||
|
config = {
|
||||||
|
"ip": ip,
|
||||||
|
"gateway": gateway,
|
||||||
|
"subnet": subnet
|
||||||
|
}
|
||||||
|
with open("config.json", "w") as f:
|
||||||
|
ujson.dump(config, f)
|
||||||
|
|
||||||
|
def ch9120_configure():
|
||||||
|
global uart1
|
||||||
|
ch9120 = CH9120(uart1)
|
||||||
|
ch9120.enter_config()
|
||||||
|
ch9120.set_mode(MODE)
|
||||||
|
ch9120.set_localIP(LOCAL_IP)
|
||||||
|
ch9120.set_subnetMask(SUBNET_MASK)
|
||||||
|
ch9120.set_gateway(GATEWAY)
|
||||||
|
ch9120.set_localPort(LOCAL_PORT1)
|
||||||
|
ch9120.set_baudRate(BAUD_RATE)
|
||||||
|
ch9120.exit_config()
|
||||||
|
uart1.read(uart1.any())
|
||||||
|
time.sleep(0.5)
|
||||||
|
uart1 = UART(1, baudrate=BAUD_RATE, tx=Pin(20), rx=Pin(21))
|
||||||
|
print("CH9120 configured.")
|
||||||
|
|
||||||
|
def update_ip_configuration(ip, gateway, subnet):
|
||||||
|
global LOCAL_IP, GATEWAY, SUBNET_MASK, uart1
|
||||||
|
|
||||||
|
LOCAL_IP = tuple(map(int, ip.split('.')))
|
||||||
|
GATEWAY = tuple(map(int, gateway.split('.')))
|
||||||
|
SUBNET_MASK = tuple(map(int, subnet.split('.')))
|
||||||
|
|
||||||
|
print("Applying new IP configuration:")
|
||||||
|
print("IP Address:", LOCAL_IP)
|
||||||
|
print("Gateway:", GATEWAY)
|
||||||
|
print("Subnet Mask:", SUBNET_MASK)
|
||||||
|
|
||||||
|
ch9120 = CH9120(uart1)
|
||||||
|
ch9120.enter_config()
|
||||||
|
ch9120.set_mode(MODE)
|
||||||
|
ch9120.set_localIP(LOCAL_IP)
|
||||||
|
ch9120.set_gateway(GATEWAY)
|
||||||
|
ch9120.set_subnetMask(SUBNET_MASK)
|
||||||
|
ch9120.set_localPort(LOCAL_PORT1)
|
||||||
|
ch9120.set_baudRate(BAUD_RATE)
|
||||||
|
ch9120.exit_config()
|
||||||
|
uart1.read(uart1.any())
|
||||||
|
|
||||||
|
uart1 = UART(1, baudrate=BAUD_RATE, tx=Pin(20), rx=Pin(21))
|
||||||
|
ch9120 = CH9120(uart1)
|
||||||
|
print("CH9120 reinitialized with new network settings.")
|
||||||
|
|
||||||
|
save_network_config(ip, gateway, subnet)
|
||||||
|
|
||||||
|
def get_uptime():
|
||||||
|
seconds = int(time.time() - start_time)
|
||||||
|
hrs = seconds // 3600
|
||||||
|
mins = (seconds % 3600) // 60
|
||||||
|
secs = seconds % 60
|
||||||
|
return f"{hrs:02d}:{mins:02d}:{secs:02d}"
|
||||||
|
|
||||||
|
def main():
|
||||||
|
ch9120_configure()
|
||||||
|
hue = 0
|
||||||
|
|
||||||
|
while True:
|
||||||
|
r, g, b = hsv_to_rgb(hue % 360, 255, 100) # hue cycles 0-360
|
||||||
|
rgb = (g << 24) | (r << 16) | (b << 8)
|
||||||
|
sm.put(rgb)
|
||||||
|
hue = (hue + 3) % 360 # Increase hue slowly for smoother transition
|
||||||
|
time.sleep(0.05)
|
||||||
|
|
||||||
|
main()
|
||||||
@@ -0,0 +1,119 @@
|
|||||||
|
import os
|
||||||
|
import json
|
||||||
|
import subprocess
|
||||||
|
import tkinter as tk
|
||||||
|
from tkinter import filedialog, messagebox
|
||||||
|
import serial.tools.list_ports
|
||||||
|
|
||||||
|
|
||||||
|
class RP2040UploaderApp:
|
||||||
|
def __init__(self, master):
|
||||||
|
self.master = master
|
||||||
|
master.title("RP2040 Uploader")
|
||||||
|
|
||||||
|
self.folder_path = os.path.join(os.getcwd(), "Resources")
|
||||||
|
self.config_path = os.path.join(self.folder_path, "config.json")
|
||||||
|
|
||||||
|
self.label = tk.Label(master, text="Searching for RP2040...")
|
||||||
|
self.label.pack(pady=10)
|
||||||
|
|
||||||
|
self.port = self.find_rp2040_port()
|
||||||
|
if self.port:
|
||||||
|
self.label.config(text=f"RP2040 detected on {self.port}")
|
||||||
|
else:
|
||||||
|
self.label.config(text="RP2040 not found.")
|
||||||
|
|
||||||
|
# Config entry fields
|
||||||
|
self.ip_entry = self.create_labeled_entry("IP Address:")
|
||||||
|
self.subnet_entry = self.create_labeled_entry("Subnet Mask:")
|
||||||
|
self.gateway_entry = self.create_labeled_entry("Gateway:")
|
||||||
|
|
||||||
|
self.load_config()
|
||||||
|
|
||||||
|
self.upload_button = tk.Button(master, text="Upload Files", command=self.upload_files, state=tk.NORMAL if self.port else tk.DISABLED)
|
||||||
|
self.upload_button.pack(pady=10)
|
||||||
|
|
||||||
|
def create_labeled_entry(self, label_text):
|
||||||
|
frame = tk.Frame(self.master)
|
||||||
|
frame.pack(pady=2)
|
||||||
|
label = tk.Label(frame, text=label_text)
|
||||||
|
label.pack(side=tk.LEFT)
|
||||||
|
entry = tk.Entry(frame, width=20)
|
||||||
|
entry.pack(side=tk.RIGHT)
|
||||||
|
return entry
|
||||||
|
|
||||||
|
def find_rp2040_port(self):
|
||||||
|
ports = serial.tools.list_ports.comports()
|
||||||
|
if not ports:
|
||||||
|
return None
|
||||||
|
|
||||||
|
# Look for common Pico descriptors
|
||||||
|
for port in ports:
|
||||||
|
desc = port.description.lower()
|
||||||
|
if any(keyword in desc for keyword in ["pico", "rp2", "raspberry", "board"]):
|
||||||
|
return port.device
|
||||||
|
|
||||||
|
# Fall back to the only port found
|
||||||
|
if len(ports) == 1:
|
||||||
|
return ports[0].device
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
def load_config(self):
|
||||||
|
if os.path.isfile(self.config_path):
|
||||||
|
try:
|
||||||
|
with open(self.config_path, "r") as f:
|
||||||
|
config = json.load(f)
|
||||||
|
self.ip_entry.insert(0, config.get("ip", ""))
|
||||||
|
self.subnet_entry.insert(0, config.get("subnet", ""))
|
||||||
|
self.gateway_entry.insert(0, config.get("gateway", ""))
|
||||||
|
except Exception as e:
|
||||||
|
messagebox.showerror("Error", f"Failed to read config.json:\n{e}")
|
||||||
|
else:
|
||||||
|
messagebox.showwarning("Missing File", f"No config.json found in {self.folder_path}")
|
||||||
|
|
||||||
|
def save_config(self):
|
||||||
|
config = {
|
||||||
|
"ip": self.ip_entry.get(),
|
||||||
|
"subnet": self.subnet_entry.get(),
|
||||||
|
"gateway": self.gateway_entry.get()
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
with open(self.config_path, "w") as f:
|
||||||
|
json.dump(config, f, indent=4)
|
||||||
|
except Exception as e:
|
||||||
|
messagebox.showerror("Error", f"Failed to save config.json:\n{e}")
|
||||||
|
|
||||||
|
def upload_files(self):
|
||||||
|
if not os.path.isdir(self.folder_path):
|
||||||
|
messagebox.showerror("Error", f"Folder '{self.folder_path}' not found.")
|
||||||
|
return
|
||||||
|
|
||||||
|
self.save_config()
|
||||||
|
|
||||||
|
success = True
|
||||||
|
for filename in os.listdir(self.folder_path):
|
||||||
|
full_path = os.path.join(self.folder_path, filename)
|
||||||
|
if os.path.isfile(full_path):
|
||||||
|
self.label.config(text=f"Uploading {filename}...")
|
||||||
|
self.master.update()
|
||||||
|
result = subprocess.run(
|
||||||
|
["mpremote", "connect", self.port, "fs", "cp", full_path, f":{filename}"],
|
||||||
|
capture_output=True,
|
||||||
|
text=True
|
||||||
|
)
|
||||||
|
if result.returncode != 0:
|
||||||
|
success = False
|
||||||
|
messagebox.showerror("Upload Failed", f"Failed to upload {filename}:\n{result.stderr}")
|
||||||
|
break
|
||||||
|
|
||||||
|
if success:
|
||||||
|
self.label.config(text=f"DONE")
|
||||||
|
self.master.update()
|
||||||
|
messagebox.showinfo("Success", "All files uploaded successfully!")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
root = tk.Tk()
|
||||||
|
app = RP2040UploaderApp(root)
|
||||||
|
root.mainloop()
|
||||||
Reference in New Issue
Block a user