Linear Actuator and Uplink Disconnect

Hello,
Problem: I am having trouble with anvil.app disconnecting after receiving a command from a pico W for a function that requires a relay to be open for more than 30s. It appears there is time limit or the link is timing out.

Here is what I am doing: I have a hobby farm where I am trying to automate certain tasks just for fun and to be able to travel more. The project I am currently working on, involves using a pico W, a relay board and a linear actuator to open and shut a door to our goat barn. All of the wiring and actuators work. The code is have written even works as long as my commands are under 28s. I am not an expert in writing code, I just started playing with pi pico W’s about 6 months ago which was when i started learning how to write code. Any help you can provide would be very appreciated because i have tried everything i can think of

Problem more in depth: The code I have written allows me to remotely, press a button on the anvil app, which triggers a relay to be left open to power the actuator. The issue is, the code only seems to work as long as the command holds the relay open for 28s or shorter. If the command is around 29-30s long i get the following error:

anvil.server.UplinkDisconnectedError: Uplink disconnected

my pico W then disconnects and reconnects to the anvil app. if this error happens 3 times i get the following error and i have to power cycle the pico w in order for it to regain connection back to the app because the following error just keeps cycling:

Connecting to Anvil…
Exception in uplink reconnection loop:
Traceback (most recent call last):
File “anvil/pico.py”, line 154, in _connect_async
File “anvil/pico.py”, line 112, in _connect
File “async_websocket_client.py”, line 94, in handshake
OSError: [Errno 12] ENOMEM

Again, everything works fine as long as the functions only last 28s or shorter. I need to be able to leave the relays open for more then 30s in order to open and shut the door, along with other functions like “pre-heating” with heat tape in the winter months

Python Code:

import anvil.pico
import time
import network
import uasyncio as a
from machine import Pin

UPLINK_KEY = “[i put my uplink code in here]”

led = Pin(“LED”, Pin.OUT, value=1)
pin_OPEN = Pin(21, Pin.OUT, value=0)
pin_CLOSED = Pin(20, Pin.OUT, value=0)
pin_stop = Pin(19, Pin.OUT, value=0)

@anvil.pico.callable_async
async def pico_fn(n):
# Output will go to the Pico W serial port
print(f"Called local function with argument: {n}")
for i in range(10):
led.toggle()
await a.sleep_ms(50)
return n * 2

@anvil.pico.callable_async
async def OpenDoor_fn(n):
print(f"Called local function with argument: {n}")
for i in range(30):
pin_OPEN.on()
time.sleep(1)
pin_OPEN.off()
return n * 2

@anvil.pico.callable_async
async def Stop_fn(n):
print(f"Called local function with argument: {n}")
for i in range(180):
pin_stop.on()
time.sleep(1)
pin_stop.off()

@anvil.pico.callable_async
async def CloseDoor_fn(n):
print(f"Called local function with argument: {n}")
for i in range(30):
pin_CLOSED.on()
time.sleep(1)
pin_CLOSED.off()

anvil.pico.connect(UPLINK_KEY)
anvil.server.wait_forever()

Anvil Code:

from ._anvil_designer import Form1Template
from anvil import *
import anvil.server

class Form1(Form1Template):
def init(self, **properties):
# Set Form properties and Data Bindings.
self.init_components(**properties)

# Any code you write here will run before the form opens.

def button_1_click(self, **event_args):
“”“This method is called when the button is clicked”“”
anvil.server.call(‘pico_fn’, 0)

def button_2_click(self, **event_args):
“”“This method is called when the button is clicked”“”
anvil.server.call(‘OpenDoor_fn’, 0)

def button_3_click(self, **event_args):
“”“This method is called when the button is clicked”“”
anvil.server.call(‘Stop_fn’, 5)

def button_4_click(self, **event_args):
“”“This method is called when the button is clicked”“”
anvil.server.call(‘CloseDoor_fn’, 10)

Any help would be very appreciated

Hello! Did you find a solution? i am running through a similar problem right now. I am doing a remote controller for a rc car, via webpage using anvil, but every 40s it reconnects. And no command can be made until it ends or the same error as yoours, appears.

wait_forever will throw if your function lasts more than 30 s.
Instead, run a scheduler loop that executes tasks when they are due.
The callable adds tasks with timestamps, and the loop executes them.
Use a priority queue to keep tasks ordered and thread-safe.

import time, heapq, threading

tasks = []
condition = threading.Condition()

def schedule(when, command):
  with condition:
    heapq.heappush(tasks, (when, command))
    condition.notify()

def execute(command):
  print("EXEC:", command, "at", time.time())

def scheduler_loop():
  while True:
    with condition:
      while not tasks:
        condition.wait()
      when, command = tasks[0]
      now = time.time()
      if when > now:
        condition.wait(timeout=when - now)
        continue
      heapq.heappop(tasks)
    execute(command)

@anvil.server.callable
def open_door():
  t = time.time()
  schedule(t, ("set", "o1", 1))
  schedule(t + 2, ("set", "o1", 0))

scheduler_loop()
1 Like