PPP: Add send_text_message for #14.

Also a quick cleanup of the lte module code.
This commit is contained in:
Phil Howard 2024-10-04 15:41:00 +01:00
parent a8a28576dc
commit 7b6b1934f8

View File

@ -18,8 +18,8 @@ DEFAULT_UART_BAUD = const(460800)
class CellularError(Exception): class CellularError(Exception):
def __init__(self, message=None): def __init__(self, message=None):
self.message = "CellularError: " + message self.message = f"CellularError: {message}"
class LTE(): class LTE():
@ -60,7 +60,7 @@ class LTE():
try: try:
return self._send_at_command("AT+CICCID", 1) return self._send_at_command("AT+CICCID", 1)
except CellularError: except CellularError:
return None return None
def status(self): def status(self):
lte_status = self._send_at_command("AT+CEREG?", 1) lte_status = self._send_at_command("AT+CEREG?", 1)
@ -71,7 +71,7 @@ class LTE():
try: try:
response = self._send_at_command("AT+CSQ", 1) response = self._send_at_command("AT+CSQ", 1)
quality = int(response.split(":")[1].split(",")[0]) quality = int(response.split(":")[1].split(",")[0])
db = -113 + (2 * quality) # conversion as per AT command set datasheet db = -113 + (2 * quality) # conversion as per AT command set datasheet
return db return db
except CellularError: except CellularError:
pass pass
@ -106,11 +106,9 @@ class LTE():
# print(e) # print(e)
# Force PPP to use modem's default settings... # Force PPP to use modem's default settings...
#time.sleep(2.0)
self._flush_uart() self._flush_uart()
self._uart.write("ATD*99#\r") self._uart.write("ATD*99#\r")
self._uart.flush() self._uart.flush()
#time.sleep(2.0)
self._ppp = PPP(self._uart) self._ppp = PPP(self._uart)
self._ppp.connect() self._ppp.connect()
@ -130,8 +128,8 @@ class LTE():
# wait for the cellular module to respond to AT commands # wait for the cellular module to respond to AT commands
self._wait_ready() self._wait_ready()
self._send_at_command("ATE0") # disable local echo self._send_at_command("ATE0") # disable local echo
self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context
# wait for roaming lte connection to be established # wait for roaming lte connection to be established
giveup = time.time() + timeout giveup = time.time() + timeout
@ -152,8 +150,9 @@ class LTE():
giveup = time.time() + timeout giveup = time.time() + timeout
while time.time() <= giveup: while time.time() <= giveup:
try: try:
# if __send_at_command doesn't throw an exception then we're good!
self._send_at_command("AT") self._send_at_command("AT")
return # if __send_at_command doesn't throw an exception then we're good! return
except CellularError as e: except CellularError as e:
print(e) print(e)
time.sleep(poll_time) time.sleep(poll_time)
@ -172,18 +171,12 @@ class LTE():
self._flush_uart() self._flush_uart()
self._uart.write(command + "\r") self._uart.write(command + "\r")
#print(f" - tx: {command}")
self._uart.flush() self._uart.flush()
status, data = self._read_result(result_lines, timeout=timeout) status, data = self._read_result(result_lines, timeout=timeout)
print(" -", command, status, data) print(" -", command, status, data)
if status == "TIMEOUT":
#print.error(" !", command, status, data)
raise CellularError(f"cellular module timed out for command {command}")
if status not in ["OK", "DOWNLOAD"]: if status not in ["OK", "DOWNLOAD"]:
#print(" !", command, status, data)
raise CellularError(f"non 'OK' or 'DOWNLOAD' result for command {command}") raise CellularError(f"non 'OK' or 'DOWNLOAD' result for command {command}")
if result_lines == 1: if result_lines == 1:
@ -199,7 +192,7 @@ class LTE():
timeout *= 1000 timeout *= 1000
while len(result) < result_lines or status is None: while len(result) < result_lines or status is None:
if (time.ticks_ms() - start) > timeout: if (time.ticks_ms() - start) > timeout:
return "TIMEOUT", [] raise CellularError("cellular module timed out")
line = self._uart.readline() line = self._uart.readline()
@ -213,3 +206,15 @@ class LTE():
return status, result return status, result
def send_text_message(self, recipient, body, timeout=5.0):
self._uart.write("AT+CMGS=\"")
self._uart.write(recipient.encode("ascii"))
self._uart.write(b"\"\r")
time.sleep(0.5)
self._uart.write(body.encode("ascii"))
self._uart.write(b"\x1a\r")
self._uart.flush()
status, _ = self._read_result(1, timeout=timeout)
return status == "OK"