PPP: Add send_text_message for #14.
Also a quick cleanup of the lte module code.
This commit is contained in:
parent
6c6a839903
commit
bfcb181cdf
@ -18,8 +18,8 @@ DEFAULT_UART_BAUD = const(460800)
|
||||
|
||||
|
||||
class CellularError(Exception):
|
||||
def __init__(self, message=None):
|
||||
self.message = "CellularError: " + message
|
||||
def __init__(self, message=None):
|
||||
self.message = f"CellularError: {message}"
|
||||
|
||||
|
||||
class LTE():
|
||||
@ -30,13 +30,13 @@ class LTE():
|
||||
DEFAULT_UART_ID,
|
||||
tx=Pin(DEFAULT_PIN_TX, Pin.OUT),
|
||||
rx=Pin(DEFAULT_PIN_RX, Pin.OUT))
|
||||
|
||||
|
||||
# Set PPP timeouts and rxbuf
|
||||
self._uart.init(
|
||||
timeout=DEFAULT_UART_TIMEOUT,
|
||||
timeout_char=DEFAULT_UART_TIMEOUT_CHAR,
|
||||
rxbuf=DEFAULT_UART_RXBUF)
|
||||
|
||||
|
||||
if not skip_reset:
|
||||
self._reset.value(0)
|
||||
time.sleep(1.0)
|
||||
@ -46,7 +46,7 @@ class LTE():
|
||||
self._led = netlight_led
|
||||
self._netlight = netlight_pin or Pin(DEFAULT_PIN_NETLIGHT, Pin.IN)
|
||||
self._netlight.irq(self._netlight_irq)
|
||||
|
||||
|
||||
def _netlight_irq(self, pin):
|
||||
self._led.value(pin.value())
|
||||
|
||||
@ -60,23 +60,23 @@ class LTE():
|
||||
try:
|
||||
return self._send_at_command("AT+CICCID", 1)
|
||||
except CellularError:
|
||||
return None
|
||||
return None
|
||||
|
||||
def status(self):
|
||||
lte_status = self._send_at_command("AT+CEREG?", 1)
|
||||
gsm_status = self._send_at_command("AT+CGREG?", 1)
|
||||
return lte_status, gsm_status
|
||||
|
||||
|
||||
def signal_quality(self):
|
||||
try:
|
||||
response = self._send_at_command("AT+CSQ", 1)
|
||||
quality = int(response.split(":")[1].split(",")[0])
|
||||
db = -113 + (2 * quality) # conversion as per AT command set datasheet
|
||||
db = -113 + (2 * quality) # conversion as per AT command set datasheet
|
||||
return db
|
||||
except CellularError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
def stop_ppp(self):
|
||||
self._ppp.disconnect()
|
||||
self._send_at_command(f"AT+IPR={DEFAULT_UART_STARTUP_BAUD}")
|
||||
@ -106,11 +106,9 @@ class LTE():
|
||||
# print(e)
|
||||
|
||||
# Force PPP to use modem's default settings...
|
||||
#time.sleep(2.0)
|
||||
self._flush_uart()
|
||||
self._uart.write("ATD*99#\r")
|
||||
self._uart.flush()
|
||||
#time.sleep(2.0)
|
||||
|
||||
self._ppp = PPP(self._uart)
|
||||
self._ppp.connect()
|
||||
@ -122,16 +120,16 @@ class LTE():
|
||||
def connect(self, timeout=60):
|
||||
print(" - setting up cellular uart")
|
||||
# connect to and flush the uart
|
||||
# consume any unsolicited messages first, we don't need those
|
||||
# consume any unsolicited messages first, we don't need those
|
||||
self._flush_uart()
|
||||
|
||||
print(" - waiting for cellular module to be ready")
|
||||
|
||||
# wait for the cellular module to respond to AT commands
|
||||
self._wait_ready()
|
||||
self._wait_ready()
|
||||
|
||||
self._send_at_command("ATE0") # disable local echo
|
||||
self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context
|
||||
self._send_at_command("ATE0") # disable local echo
|
||||
self._send_at_command(f"AT+CGDCONT=1,\"IP\",\"{self._apn}\"") # set apn and activate pdp context
|
||||
|
||||
# wait for roaming lte connection to be established
|
||||
giveup = time.time() + timeout
|
||||
@ -140,10 +138,10 @@ class LTE():
|
||||
status = self._send_at_command("AT+CEREG?", 1)
|
||||
time.sleep(0.25)
|
||||
if time.time() > giveup:
|
||||
raise CellularError("timed out getting network registration")
|
||||
raise CellularError("timed out getting network registration")
|
||||
|
||||
# disable server and client certification validation
|
||||
self._send_at_command("AT+CSSLCFG=\"authmode\",0,0")
|
||||
self._send_at_command("AT+CSSLCFG=\"authmode\",0,0")
|
||||
self._send_at_command("AT+CSSLCFG=\"enableSNI\",0,1")
|
||||
|
||||
print(f" - SIM ICCID is {self.iccid()}")
|
||||
@ -152,13 +150,14 @@ class LTE():
|
||||
giveup = time.time() + timeout
|
||||
while time.time() <= giveup:
|
||||
try:
|
||||
# if __send_at_command doesn't throw an exception then we're good!
|
||||
self._send_at_command("AT")
|
||||
return # if __send_at_command doesn't throw an exception then we're good!
|
||||
return
|
||||
except CellularError as e:
|
||||
print(e)
|
||||
time.sleep(poll_time)
|
||||
|
||||
raise CellularError("timed out waiting for AT response")
|
||||
raise CellularError("timed out waiting for AT response")
|
||||
|
||||
def _flush_uart(self):
|
||||
self._uart.flush()
|
||||
@ -168,28 +167,22 @@ class LTE():
|
||||
time.sleep(0.25)
|
||||
|
||||
def _send_at_command(self, command, result_lines=0, timeout=5.0):
|
||||
# consume any unsolicited messages first, we don't need those
|
||||
# consume any unsolicited messages first, we don't need those
|
||||
self._flush_uart()
|
||||
|
||||
self._uart.write(command + "\r")
|
||||
#print(f" - tx: {command}")
|
||||
self._uart.flush()
|
||||
status, data = self._read_result(result_lines, timeout=timeout)
|
||||
|
||||
print(" -", command, status, data)
|
||||
|
||||
if status == "TIMEOUT":
|
||||
#print.error(" !", command, status, data)
|
||||
raise CellularError(f"cellular module timed out for command {command}")
|
||||
|
||||
if status not in ["OK", "DOWNLOAD"]:
|
||||
#print(" !", command, status, data)
|
||||
raise CellularError(f"non 'OK' or 'DOWNLOAD' result for command {command}")
|
||||
|
||||
if result_lines == 1:
|
||||
return data[0]
|
||||
if result_lines > 1:
|
||||
return data
|
||||
return data
|
||||
return None
|
||||
|
||||
def _read_result(self, result_lines, timeout=1.0):
|
||||
@ -199,7 +192,7 @@ class LTE():
|
||||
timeout *= 1000
|
||||
while len(result) < result_lines or status is None:
|
||||
if (time.ticks_ms() - start) > timeout:
|
||||
return "TIMEOUT", []
|
||||
raise CellularError("cellular module timed out")
|
||||
|
||||
line = self._uart.readline()
|
||||
|
||||
@ -213,3 +206,15 @@ class LTE():
|
||||
|
||||
return status, result
|
||||
|
||||
def send_text_message(self, recipient, body, timeout=5.0):
|
||||
self._uart.write("AT+CMGS=\"")
|
||||
self._uart.write(recipient.encode("ascii"))
|
||||
self._uart.write(b"\"\r")
|
||||
time.sleep(0.5)
|
||||
self._uart.write(body.encode("ascii"))
|
||||
self._uart.write(b"\x1a\r")
|
||||
self._uart.flush()
|
||||
|
||||
status, _ = self._read_result(1, timeout=timeout)
|
||||
|
||||
return status == "OK"
|
||||
|
||||
Loading…
Reference in New Issue
Block a user