Plug in stick, log in.
http://192.168.100.1/usbdebug.html
Reboot (unplug, replug)
adb shell
setprop service.adb.root 1; busybox killall adbdadb reboot edledl rf uz801-stock.bin
Then download individual partitions.
edl rl uz801_stock --genxml
Reboot (unplug, replug)
adb reboot bootloader
cd OpenStick/flash/./flash.sh
sudo mount -o loop ~/firmwares/uz801_stock/modem.bin /mnt/test to mount the modem.bin file
then copy all the files over to the /lib/firmware folder on the stick.
on the stick: mkdir ~/fw
on the computer you’re using the do the work: scp -r /mnt/test/image/* user@192.168.200.1:~/fw
on the stick: sudo mv /lib/firmware/wlan ~/fw
sudo rm -r /lib/firmware/*
sudo mv ~/fw/* /lib/firmware
change default modem settings depending on SIM card and available network
easiest might be to delete the profile contained in the debian image and start over with:
sudo nmcli con down lte
sudo nmcli con del lte
sudo nmcli connection add type gsm ifname 'wwan0qmi0' con-name lte apn ""
(note that leaving APN blank will allow the SIM to automatically acquire the APN)
———
sudo nmcli con down lte
sudo nmcli con modify lte gsm.apn web.sentel.com (shouldn’t be necessary in normal operation)
sudo nmcli con modify lte connection.autoconnect yes
sudo nmcli con modify lte ipv6.method disabled
sudo nmcli con modify lte connection.autoconnect-retries 0
sudo mmcli -m 0 --set-allowed-modes="4g|3g" --set-preferred-mode=4g (note this is mmcli, not nmcli)
sudo nmcli con up lte
The preferred mode gets reset to 3G every reboot, so the following is a bash script that waits for the modem to become available on boot, then changes preferred mode to 4G.
sudo nano /usr/local/bin/configure-modem.sh
#!/bin/bash
LOG_FILE="/var/log/modem-config.log"
log() {
echo "$(date '+%Y-%m-%d %H:%M:%S') - $1" | tee -a "$LOG_FILE"
}
log "=== Modem configuration script started ==="
# Wait for modem to be available
log "Waiting for modem to be detected..."
for i in {1..60}; do
MODEM_LIST=$(mmcli -L 2>&1)
MODEM_CHECK=$?
if [ $MODEM_CHECK -ne 0 ]; then
log "Attempt $i/60: mmcli -L failed with exit code $MODEM_CHECK"
log "Output: $MODEM_LIST"
else
log "Attempt $i/60: mmcli -L succeeded"
if echo "$MODEM_LIST" | grep -q "Modem"; then
log "Modem detected! Output: $MODEM_LIST"
log "Waiting 5 seconds for modem to fully initialize..."
sleep 5
log "Configuring modem with: mmcli -m 0 --set-allowed-modes=\"4g|3g\" --set-preferred-mode=4g"
CONFIG_OUTPUT=$(mmcli -m 0 --set-allowed-modes="4g|3g" --set-preferred-mode=4g 2>&1)
CONFIG_EXIT=$?
if [ $CONFIG_EXIT -eq 0 ]; then
log "SUCCESS: Modem configured successfully"
log "Output: $CONFIG_OUTPUT"
exit 0
else
log "ERROR: Modem configuration failed with exit code $CONFIG_EXIT"
log "Output: $CONFIG_OUTPUT"
exit 1
fi
else
log "Attempt $i/60: No modem found in output yet"
fi
fi
sleep 1
done
log "ERROR: Timeout reached after 60 seconds. Modem not detected."
exit 1
sudo chmod +x /usr/local/bin/configure-modem.sh
sudo nano /etc/systemd/system/modem-config.service
[Unit]
Description=Configure Modem Network Modes
After=ModemManager.service
Wants=ModemManager.service
[Service]
Type=oneshot
ExecStart=/usr/local/bin/configure-modem.sh
StandardOutput=journal
StandardError=journal
SyslogIdentifier=modem-config
Restart=on-failure
RestartSec=10
StartLimitBurst=3
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable modem-config.service
*edit* 15NOV25 Change sudo nano /etc/apt/sources.list from stable to bookworm so it doesn’t pull trixie files.
then update the bookworm install with sudo apt update/upgrade
make user not need sudo password sudo visudo then replace %sudo ALL=(ALL:ALL) ALL with %sudo ALL=(ALL) NOPASSWD: ALL
(optional) install node red (before installing anything else, since the installer script requires 100MB RAM). Node-Red, Telegraf and Victoria Metrics seem to be a bit too much for the stick, it is unstable with less than 50MB RAM remaining. Any of the three can be left out.
bash <(curl -sL https://raw.githubusercontent.com/node-red/linux-installers/master/deb/update-nodejs-and-nodered)
install mosquitto and configure (adapted from here)
sudo apt install mosquitto
sudo nano /etc/mosquitto/mosquitto.conf and add the following:
listener 1883
allow_anonymous false
password_file /etc/mosquitto/passwd-file
now need to establish logins and passwords for various users of your MQTT broker (IOT devices, telegraf, admin)
sudo mosquitto_passwd -c /etc/mosquitto/passwd-file user (for the first user)
sudo mosquitto_passwd /etc/mosquitto/passwd-file user (for any users after the first)
install Victoria Metrics and configure
wget https://github.com/VictoriaMetrics/VictoriaMetrics/releases/download/v1.129.1/victoria-metrics-linux-arm64-v1.129.1.tar.gz
sudo tar -xvzf victoria-metrics-linux-arm64-v1.129.1.tar.gz -C /usr/local/bin
sudo useradd -s /usr/sbin/nologin victoriametrics
sudo mkdir -p /var/lib/victoria-metrics && sudo chown -R victoriametrics:victoriametrics /var/lib/victoria-metrics
Add a service file:
sudo nano /etc/systemd/system/victoriametrics.service
[Unit]
Description=VictoriaMetrics service
After=network.target
[Service]
Type=simple
User=victoriametrics
Group=victoriametrics
ExecStart=/usr/local/bin/victoria-metrics-prod -storageDataPath=/var/lib/victoria-metrics -retentionPeriod=365d -selfScrapeInterval=15m
SyslogIdentifier=victoriametrics
Restart=always
PrivateTmp=yes
ProtectHome=yes
NoNewPrivileges=yes
ProtectSystem=full
[Install]
WantedBy=multi-user.target
Activate service:
sudo systemctl daemon-reload && sudo systemctl enable --now victoriametrics.service
If needed, install telegraf and configure (from here)
cd ~/
wget -q https://repos.influxdata.com/influxdata-archive_compat.key
gpg --with-fingerprint --show-keys ./influxdata-archive_compat.key
cat influxdata-archive_compat.key | gpg --dearmor | sudo tee /etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg > /dev/null
echo 'deb [signed-by=/etc/apt/trusted.gpg.d/influxdata-archive_compat.gpg] https://repos.influxdata.com/debian stable main' | sudo tee /etc/apt/sources.list.d/influxdata.list
sudo rm -f /etc/apt/trusted.gpg.d/influxdb.gpg
sudo apt update
sudo apt install telegraf
adjust telegraf.conf to receive data from mqtt (and system metrics if desired) and send to victoria metrics (which can accept influxdb line protocol, so is a drop-in, low memory replacement for influxdb)
sudo nano /etc/telegraf/telegraf.conf
# Telegraf Configuration
#
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"
## Log at debug level.
debug = true
## Log only error level messages.
# quiet = false
## Log target controls the destination for logs and can be one of "file",
## "stderr" or, on Windows, "eventlog". When set to "file", the output file
## is determined by the "logfile" setting.
logtarget = "file"
## Name of the file to be logged to when using the "file" logtarget. If set to
## the empty string then logs are written to stderr.
logfile = "/var/log/telegraf/telegraf.log"
## The logfile will be rotated when it becomes larger than the specified
## size. When set to 0 no size based rotation is performed.
logfile_rotation_max_size = "1MB"
## Maximum number of rotated archives to keep, any older logs are deleted.
## If set to -1, no archives are removed.
logfile_rotation_max_archives = 5
## Pick a timezone to use when logging or type 'local' for local time.
## Example: America/Chicago
log_with_timezone = "local"
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
[[outputs.influxdb]]
urls = ["http://127.0.0.1:8428"]
# # Configuration for sending metrics to InfluxDB
# [[outputs.influxdb]]
# ## The full HTTP or UDP URL for your InfluxDB instance.
# ##
# ## Multiple URLs can be specified for a single cluster, only ONE of the
# ## urls will be written to each interval.
# # urls = ["unix:///var/run/influxdb.sock"]
# # urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
# ## The target database for metrics; will be created as needed.
# ## For UDP url endpoint database needs to be configured on server side.
database = "telegraf"
##
# ## If true, no CREATE DATABASE queries will be sent. Set to true when using
# ## Telegraf with a user without permissions to create databases or when the
# ## database already exists.
skip_database_creation = false
#
# ## Name of existing retention policy to write to. Empty string writes to
# ## the default retention policy. Only takes effect when using HTTP.
# # retention_policy = ""
#
# ## HTTP Basic Auth
username = "telegraf"
password = "password"
#
###############################################################################
# INPUT PLUGINS #
###############################################################################
[[inputs.cpu]]
percpu = false
#[[inputs.disk]]
#ignore_fs = [ "tmpfs", "devtmpfs" ]
#[[inputs.diskio]]
#[[inputs.kernel]]
[[inputs.mem]]
#[[inputs.processes]]
#[[inputs.swap]]
[[inputs.system]]
[[inputs.net]]
fieldpass = [ "bytes*" ]
[[inputs.netstat]]
[[inputs.temp]]
#This section reads correct system uptime to VictoriaMetrics so grafana can graph it correctly.
#Normally telegraf [system] input refers to the build date/time on startup, which is months out of date.
[[inputs.exec]]
commands = ["sh -c \"awk '{print \\\"system uptime=\\\" int($1) \\\"i\\\"}' /proc/uptime\""]
timeout = "5s"
data_format = "influx"
name_override = "system_correct" # Different measurement name than normal system_uptime
[[inputs.mqtt_consumer]]
alias = "mqtt"
startup_error_behavior = "retry"
servers = ["tcp://127.0.0.1:1883"]
topics = [
"N/0281f08bbe75/system/0/#",
]
client_id = "telegraf"
username = "username"
password = "password"
data_format = "json"
Telegraf is a memory hog and chokes with too many MQTT metrics, so it’s often easier to just capture a few metrics with Node Red and send directly to Victoria Metrics. Leaving telegraf uninstalled allows enough RAM to run a Grafana server. (an old version that does simple graphs, and uses MUCH less RAM). Another option is to install an old version of telegraf that also uses much less RAM.
wget https://dl.influxdata.com/telegraf/releases/telegraf_1.8.3-1_arm64.deb
sudo dpkg -i telegraf_1.8.3-1_arm64.deb
# Telegraf Configuration
#
# Global tags can be specified here in key="value" format.
[global_tags]
# dc = "us-east-1" # will tag all metrics with dc=us-east-1
# rack = "1a"
## Environment variables can be used as tags, and throughout the config file
# user = "$USER"
# Configuration for telegraf agent
[agent]
## Default data collection interval for all inputs
interval = "10s"
## Rounds collection interval to 'interval'
## ie, if interval="10s" then always collect on :00, :10, :20, etc.
round_interval = true
## Telegraf will send metrics to outputs in batches of at most
## metric_batch_size metrics.
## This controls the size of writes that Telegraf sends to output plugins.
metric_batch_size = 1000
## Maximum number of unwritten metrics per output. Increasing this value
## allows for longer periods of output downtime without dropping metrics at the
## cost of higher maximum memory usage.
metric_buffer_limit = 10000
## Collection jitter is used to jitter the collection by a random amount.
## Each plugin will sleep for a random time within jitter before collecting.
## This can be used to avoid many plugins querying things like sysfs at the
## same time, which can have a measurable effect on the system.
collection_jitter = "0s"
## Default flushing interval for all outputs. Maximum flush_interval will be
## flush_interval + flush_jitter
flush_interval = "10s"
## Jitter the flush interval by a random amount. This is primarily to avoid
## large write spikes for users running a large number of telegraf instances.
## ie, a jitter of 5s and interval 10s means flushes will happen every 10-15s
flush_jitter = "0s"
## Precision will NOT be used for service inputs. It is up to each individual
## service input to set the timestamp at the appropriate precision.
precision = "0s"
## Log at debug level.
debug = true
## Log only error level messages.
# quiet = false
## Log target controls the destination for logs and can be one of "file",
## "stderr" or, on Windows, "eventlog". When set to "file", the output file
## is determined by the "logfile" setting.
# logtarget = "file"
## Name of the file to be logged to when using the "file" logtarget. If set to
## the empty string then logs are written to stderr.
# logfile = "/var/log/telegraf/telegraf.log"
## The logfile will be rotated when it becomes larger than the specified
## size. When set to 0 no size based rotation is performed.
# logfile_rotation_max_size = "1MB"
## Maximum number of rotated archives to keep, any older logs are deleted.
## If set to -1, no archives are removed.
# logfile_rotation_max_archives = 5
## Pick a timezone to use when logging or type 'local' for local time.
## Example: America/Chicago
# log_with_timezone = "local"
## Override default hostname, if empty use os.Hostname()
hostname = ""
## If set to true, do no set the "host" tag in the telegraf agent.
omit_hostname = false
###############################################################################
# OUTPUT PLUGINS #
###############################################################################
[[outputs.influxdb]]
urls = ["http://127.0.0.1:8428"]
# # Configuration for sending metrics to InfluxDB
# [[outputs.influxdb]]
# ## The full HTTP or UDP URL for your InfluxDB instance.
# ##
# ## Multiple URLs can be specified for a single cluster, only ONE of the
# ## urls will be written to each interval.
# # urls = ["unix:///var/run/influxdb.sock"]
# # urls = ["udp://127.0.0.1:8089"]
# urls = ["http://127.0.0.1:8086"]
# ## The target database for metrics; will be created as needed.
# ## For UDP url endpoint database needs to be configured on server side.
database = "telegraf"
##
# ## If true, no CREATE DATABASE queries will be sent. Set to true when using
# ## Telegraf with a user without permissions to create databases or when the
# ## database already exists.
skip_database_creation = false
#
# ## Name of existing retention policy to write to. Empty string writes to
# ## the default retention policy. Only takes effect when using HTTP.
# # retention_policy = ""
#
# ## HTTP Basic Auth
username = "telegraf"
password = "password"
#
###############################################################################
# INPUT PLUGINS #
###############################################################################
[[inputs.cpu]]
percpu = true
[[inputs.disk]]
ignore_fs = [ "tmpfs", "devtmpfs" ]
#[[inputs.diskio]]
#[[inputs.kernel]]
[[inputs.mem]]
#[[inputs.processes]]
#[[inputs.swap]]
[[inputs.system]]
[[inputs.net]]
fieldpass = [ "bytes*" ]
[[inputs.netstat]]
[[inputs.temp]]
#This section reads correct system uptime to VictoriaMetrics so grafana can graph it correctly.
#Normally telegraf [system] input refers to the build date/time on startup, which is months out of date.
[[inputs.exec]]
commands = ["sh -c \"awk '{print \\\"system uptime=\\\" int($1) \\\"i\\\"}' /proc/uptime\""]
timeout = "5s"
data_format = "influx"
name_override = "system_correct" # Different measurement name than normal system_uptime
[[inputs.mqtt_consumer]]
#alias = "mqtt"
#startup_error_behavior = "retry"
servers = ["tcp://127.0.0.1:1883"]
topics = [
"N/0281f08bbe75/system/0/#",
]
client_id = "telegraf"
username = "username"
password = "password"
data_format = "json"
Note the [inputs.exec] portion, which pulls uptime from /proc/uptime, which is better than where telegraf normally gets it from (necessary because the system clock is updated a few minutes after boot using the modem).
sudo apt-get install -y adduser libfontconfig1 musl
wget https://dl.grafana.com/enterprise/release/grafana-enterprise_6.7.0_arm64.deb
sudo dpkg -i grafana-enterprise_6.7.0_arm64.deb
Another light option is Perses, but has issues with CORS.
wget https://github.com/perses/perses/releases/download/v0.53.0-beta.2/perses_0.53.0-beta.2_linux_arm64.tar.gz
sudo tar -xvzf ~/perses_0.53.0-beta.2_linux_arm64.tar.gz -C /opt/perses
install pivpn
curl -L https://install.pivpn.io | bash
Need to create the tun0 device on each reboot with sudo nano ~/maketun.sh
#!/bin/bash
mkdir -p /dev/net
mknod /dev/net/tun c 10 200
chmod 600 /dev/net/tun
systemctl restart openvpn@server
sudo chmod 755 ~/maketun.sh
sudo crontab -e
add @reboot /home/user/maketun.sh
create reverse ssh tunnel as detailed here.
install chrony and edit conf file
sudo apt install chrony
sudo nano /etc/chrony/chrony.conf
#add to the bottom
allow 192.0.0.0/8
local stratum 8
manual
The ‘local stratum 8’ and ‘manual’ allows chrony to serve the system time, not just NTP sources. Useful for no network connectivity when you’re pulling system time from the cell phone tower using sudo mmcli -m 0 –time.
Limit journal size
sudo nano /etc/systemd/journald.conf
#add to bottom
SystemMaxUse=50M
RuntimeMaxUse=50M
sudo timedatectl set-timezone Africa/Dakar
Install cron so various things can be automated
sudo apt install cron
sudo crontab -e
Has cron jobs for checking data balance using USSD commands (see script on post about using USSD commands)
@reboot /home/user/maketun.sh
0 2 * * * /sbin/shutdown -r now
If you have a weak cell signal or the the stick is working hard, it can overheat causing reboots and general instability. One solution is to disable two of the four cores. Normal operations with this setup (apart from startup) idle at 0-10% cpu, so it’s not a big deal. Edit the rc.local file as follows:
sudo nano /etc/rc.local
#!/bin/bash
main() {
cleanup_wwan
disable_cpu_cores
}
disable_cpu_cores() {
echo 0 > /sys/devices/system/cpu/cpu2/online
echo 0 > /sys/devices/system/cpu/cpu3/online
}
cleanup_wwan() {
ip netns add null || true
max_wait=20
waited=0
while [ $waited -lt $max_wait ]; do
if ip link show wwan7 &>/dev/null; then
break
fi
sleep 1
waited=$((waited + 1))
done
for i in wwan1 wwan2 wwan3 wwan4 wwan5 wwan6 wwan7; do
ip link set "$i" netns null
done
# this may be required on some boards to initialize the modem
sleep 2
if ! ip -4 addr show wwan0 | grep -q "inet "; then
systemctl restart ModemManager
fi
}
main
echo heartbeat > /sys/devices/platform/leds/leds/red:power/trigger
exit 0
If necessary, install fail2ban and configure. If only exposing OpenVPN port, not necessary.
sudo apt install fail2ban
sudo cp /etc/fail2ban/jail.conf /etc/fail2ban/jail.local
sudo nano /etc/fail2ban/jail.local
After all the defaults is the [sshd] section, enter the following
enabled = yes
#whatever ips you know you want to always allow through
ignoreip = 127.0.0.1/8 ::1 56.78.0.0/16 12.34.0.0/16
bantime = 24h
maxretry = 2
findtime = 24h
port = ssh
logpath = SYSTEMD-JOURNAL
backend = systemd
If you’ll be running the stick as a data collection server in a remote location, it will need some way to access accurate time. If you have a data plan with the SIM card, it’s no problem and chrony will handle it. However, if you just want to log data and occasionally buy data credit and remote in, the system needs a way to get time. Fortunately, just the act of connecting the SIM card to the GSM network will give us access to relatively accurate time. Create a script as follows:
sudo nano ~/timecheck.sh
#!/bin/bash
SAVED_TIME_FILE="/home/user/saved_time"
# Function to check if modem is connected
is_connected() {
sudo mmcli -m any | grep -q "connected\|connecting\|disconnecting\|registered"
return $?
}
# Function to extract time from modem
get_modem_time() {
sudo mmcli -m any --time | awk 'NR == 2 {
print substr($0,23,4)"-"substr($0,28,2)"-"substr($0,31,2)" "substr($0,34,2)":"substr($0,37,2)":"substr($0,40,2)
}'
}
abs_diff=100000
# Main loop
while [ "$abs_diff" -gt 5 ]; do
if is_connected; then
modem_time=$(get_modem_time)
if [ -z "$modem_time" ]; then
echo "Could not get modem time. Waiting..."
sleep 60
continue
fi
# Convert both times to epoch seconds
modem_epoch=$(date -d "$modem_time" +%s 2>/dev/null)
if [ $? -ne 0 ]; then
echo "Invalid modem time format: $modem_time. Waiting..."
sleep 60
continue
fi
system_epoch=$(date +%s)
# Calculate time difference in seconds
diff=$((system_epoch - modem_epoch))
abs_diff=${diff#-} # Absolute value
echo "Time difference: $abs_diff seconds"
# If difference is greater than 5 seconds, update system time
if [ "$abs_diff" -gt 5 ]; then
echo "Stopping chrony.service..."
sudo systemctl stop chrony.service
echo "Updating system time to modem time: $modem_time"
sudo timedatectl set-time "$modem_time"
# Save the correct time to file for next boot
date +%s > "$SAVED_TIME_FILE"
echo "Saved current time ($(date +%s)) to $SAVED_TIME_FILE"
echo "Starting chrony.service..."
sudo systemctl start chrony.service
else
echo "System time is within 5 seconds of modem time. No update needed."
# Still save the time even if no update was needed (time is correct)
date +%s > "$SAVED_TIME_FILE"
echo "Saved current time ($(date +%s)) to $SAVED_TIME_FILE"
fi
else
echo "Modem is not connected."
fi
# Wait for 60 seconds
sleep 60
done
# When loop exits (time is synced), save final timestamp
date +%s > "$SAVED_TIME_FILE"
echo "Final time saved to $SAVED_TIME_FILE"
sudo chmod 755 ~/timecheck.sh
sudo nano /etc/systemd/system/timecheck.service
[Unit]
Description=Modem Time Synchronization Service
After=network.target
[Service]
ExecStart=/home/user/timecheck.sh
Restart=always
RestartSec=10
User=root
[Install]
WantedBy=multi-user.target
enable the script with sudo systemctl daemon-reload and then sudo systemctl enable timecheck.service
Because the 4G stick does not have a hardware clock (RTC), we need to create a fake hardware clock for the system to reference at boot. This makes writing to time series databases more accurate.
sudo nano restore-time.sh
#!/bin/bash
# /home/user/restore-time.sh
SAVED_TIME_FILE="/home/user/saved_time"
if [ -f "$SAVED_TIME_FILE" ]; then
SAVED_TIMESTAMP=$(cat "$SAVED_TIME_FILE")
if [ -n "$SAVED_TIMESTAMP" ] && [ "$SAVED_TIMESTAMP" -gt 0 ]; then
# Set system time to saved timestamp
sudo timedatectl set-time "@$SAVED_TIMESTAMP" 2>/dev/null || sudo date -s "@$SAVED_TIMESTAMP"
echo "Restored time from last save: $(date)"
else
echo "Invalid saved timestamp in $SAVED_TIME_FILE"
fi
else
echo "No saved time found at $SAVED_TIME_FILE, using default system time"
fi
sudo chmod +x /home/user/restore-time.sh
sudo nano /etc/systemd/system/restore-time.service
[Unit]
Description=Restore saved time on boot
DefaultDependencies=no
Before=sysinit.target time-sync.target telegraf.service
After=local-fs.target
[Service]
Type=oneshot
ExecStart=/home/user/restore-time.sh
RemainAfterExit=yes
[Install]
WantedBy=sysinit.target
sudo systemctl daemon-reload
sudo systemctl enable restore-time.service
With the current version of debian on this stick, there is a bug that prevents local clients from talking to each other. In my setup, I need access. So I can start a ssh tunnel to forward http traffic via the server.
ssh -L 8081:192.168.100.26:80 user@192.168.100.1
Note that the port 8081 doesn’t matter, as long as it is larger than 1024 and doesn’t conflict with any other services you’re running. I can then access the other client on my local web browser with http://localhost:8081
If you don’t want to have to do any work on the client computer, you can have socat do it on the stick.
sudo apt install socat
sudo nano /etc/systemd/system/socat-bridge.service
[Unit]
Description=Socat HTTP Bridge
After=network.target
[Service]
Type=simple
ExecStart=/usr/bin/socat TCP-LISTEN:8081,fork,reuseaddr TCP:192.168.100.10:80
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable socat-bridge.service
sudo systemctl start socat-bridge.service
Now you can type in http://192.168.100.1:8081 on the client computer and the stick will pass the http traffic back and forth. Because the IP address of the remote server is fixed in the service file, you should add a static dhcp assignment for it in /etc/dnsmasq.d/dhcp.conf
dhcp-host=C8:C9:A3:10:B3:8B,192.168.100.10,infinite
For monitoring network connectivity, the following will ping some dns servers and log the data to victoriametrics.
sudo apt install python3 iputils-ping
sudo nano /usr/local/bin/connectivity_monitor.py
#!/usr/bin/env python3
"""
Internet Connectivity Monitor
Checks internet connectivity every minute and pushes metrics to Prometheus/VictoriaMetrics
Uses only Python standard library - no external dependencies required
"""
import time
import socket
from datetime import datetime
import subprocess
import sys
from urllib.request import urlopen, Request
from urllib.error import URLError, HTTPError
# Configuration
PROMETHEUS_URL = "http://192.168.100.1:8428/api/v1/import/prometheus"
CHECK_INTERVAL = 60 # seconds
PING_HOSTS = [
"8.8.8.8", # Google DNS
"1.1.1.1", # Cloudflare DNS
"8.8.4.4" # Google DNS secondary
]
HTTP_CHECK_URL = "https://www.google.com"
TIMEOUT = 5 # seconds
def check_dns_resolution():
"""Check if DNS resolution is working"""
try:
socket.gethostbyname("www.google.com")
return 1
except socket.gaierror:
return 0
def check_http_connectivity():
"""Check if HTTP/HTTPS connectivity is working"""
try:
request = Request(HTTP_CHECK_URL, headers={'User-Agent': 'Mozilla/5.0'})
response = urlopen(request, timeout=TIMEOUT)
return 1 if response.status == 200 else 0
except (URLError, HTTPError, socket.timeout):
return 0
def ping_host(host):
"""Ping a host and return latency in ms, or -1 if failed"""
try:
result = subprocess.run(
["ping", "-c", "1", "-W", str(TIMEOUT), host],
capture_output=True,
text=True,
timeout=TIMEOUT + 1
)
if result.returncode == 0:
# Extract latency from ping output
for line in result.stdout.split('\n'):
if 'time=' in line:
latency = line.split('time=')[1].split()[0]
return float(latency)
return -1
except (subprocess.TimeoutExpired, Exception):
return -1
def collect_metrics():
"""Collect all connectivity metrics"""
metrics = {}
# DNS check
metrics['internet_dns_working'] = check_dns_resolution()
# HTTP connectivity check
metrics['internet_http_working'] = check_http_connectivity()
# Ping checks
ping_success_count = 0
total_latency = 0
for host in PING_HOSTS:
latency = ping_host(host)
host_label = host.replace('.', '_')
if latency >= 0:
metrics[f'internet_ping_success{{host="{host}"}}'] = 1
metrics[f'internet_ping_latency_ms{{host="{host}"}}'] = latency
ping_success_count += 1
total_latency += latency
else:
metrics[f'internet_ping_success{{host="{host}"}}'] = 0
metrics[f'internet_ping_latency_ms{{host="{host}"}}'] = 0
# Overall connectivity status (1 if at least one ping succeeds)
metrics['internet_connected'] = 1 if ping_success_count > 0 else 0
# Average latency (only if at least one ping succeeded)
if ping_success_count > 0:
metrics['internet_ping_latency_avg_ms'] = total_latency / ping_success_count
else:
metrics['internet_ping_latency_avg_ms'] = 0
return metrics
def format_prometheus_metrics(metrics):
"""Format metrics in Prometheus text format"""
lines = []
timestamp = int(time.time() * 1000) # milliseconds
for metric_name, value in metrics.items():
lines.append(f"{metric_name} {value} {timestamp}")
return '\n'.join(lines)
def push_metrics(metrics):
"""Push metrics to Prometheus/VictoriaMetrics"""
try:
prometheus_data = format_prometheus_metrics(metrics)
data = prometheus_data.encode('utf-8')
request = Request(
PROMETHEUS_URL,
data=data,
headers={'Content-Type': 'text/plain'},
method='POST'
)
response = urlopen(request, timeout=10)
if response.status in [200, 204]:
print(f"[{datetime.now().isoformat()}] Metrics pushed successfully")
return True
else:
print(f"[{datetime.now().isoformat()}] Failed to push metrics: HTTP {response.status}")
return False
except (URLError, HTTPError, socket.timeout) as e:
print(f"[{datetime.now().isoformat()}] Error pushing metrics: {e}")
return False
def main():
"""Main monitoring loop"""
print(f"Starting Internet Connectivity Monitor")
print(f"Target: {PROMETHEUS_URL}")
print(f"Check interval: {CHECK_INTERVAL} seconds")
print(f"Ping hosts: {', '.join(PING_HOSTS)}")
print("-" * 60)
while True:
try:
# Collect metrics
metrics = collect_metrics()
# Display status
status = "ONLINE" if metrics['internet_connected'] == 1 else "OFFLINE"
print(f"[{datetime.now().isoformat()}] Status: {status}")
# Push to Prometheus
push_metrics(metrics)
# Wait for next check
time.sleep(CHECK_INTERVAL)
except KeyboardInterrupt:
print("\nShutting down gracefully...")
sys.exit(0)
except Exception as e:
print(f"[{datetime.now().isoformat()}] Unexpected error: {e}")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
sudo chmod +x /usr/local/bin/connectivity_monitor.py
sudo nano /etc/systemd/system/connectivity-monitor.service
[Unit]
Description=Internet Connectivity Monitor
After=network.target
Wants=network.target
[Service]
Type=simple
ExecStart=/usr/bin/python3 /usr/local/bin/connectivity_monitor.py
Restart=always
RestartSec=10
User=root
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable connectivity-monitor
Following is a script to check SIM data balance and expiration, and log those stats to victoriametrics. Also sends a weekly summary to an email address.
sudo apt install python3 python3-requests modemmanager
sudo nano /usr/local/bin/sim-balance-monitor.py
#!/usr/bin/env python3
"""
SIM Balance Monitor
Checks SIM card data balance daily and sends weekly email/SMS reports
"""
import subprocess
import re
import csv
import time
import smtplib
import socket
import os
from datetime import datetime, timedelta
from pathlib import Path
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sys
import argparse
import logging
import configparser
# Default configuration file path
DEFAULT_CONFIG_FILE = '/etc/sim-balance-monitor.conf'
# Default values if not specified in config
DEFAULT_CONFIG = {
'modem_index': '0',
'data_file': '/var/log/sim-balance/balance-data.csv',
'pending_file': '/var/log/sim-balance/pending-report.txt',
'log_file': '/var/log/sim-balance/monitor.log',
'debug_mode': 'False',
'ussd_init': '#1234#',
'ussd_responses': '3,1',
'sms_ussd_init': '#123#',
'sms_ussd_responses': '0,3',
}
def load_config(config_file=DEFAULT_CONFIG_FILE):
"""Load configuration from file"""
config = configparser.ConfigParser()
if not Path(config_file).exists():
print(f"ERROR: Configuration file not found: {config_file}", file=sys.stderr)
print(f"Please create the configuration file at {config_file}", file=sys.stderr)
print(f"See the installation guide for the configuration template.", file=sys.stderr)
sys.exit(1)
try:
config.read(config_file)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}", file=sys.stderr)
sys.exit(1)
# Build configuration dictionary
cfg = {}
# General settings
# Ensure modem_index is stored as a string as mmcli expects a string
cfg['modem_index'] = config.get('General', 'modem_index', fallback=DEFAULT_CONFIG['modem_index'])
cfg['data_file'] = config.get('General', 'data_file', fallback=DEFAULT_CONFIG['data_file'])
cfg['pending_file'] = config.get('General', 'pending_file', fallback=DEFAULT_CONFIG['pending_file'])
cfg['log_file'] = config.get('General', 'log_file', fallback=DEFAULT_CONFIG['log_file'])
# --- NEW CONFIGURATION LINE ---
cfg['debug_mode'] = config.getboolean('General', 'debug_mode', fallback=DEFAULT_CONFIG['debug_mode'])
# USSD codes
cfg['ussd_init'] = config.get('USSD', 'data_init', fallback=DEFAULT_CONFIG['ussd_init'])
cfg['ussd_responses'] = config.get('USSD', 'data_responses', fallback=DEFAULT_CONFIG['ussd_responses']).split(',')
cfg['sms_ussd_init'] = config.get('USSD', 'sms_init', fallback=DEFAULT_CONFIG['sms_ussd_init'])
cfg['sms_ussd_responses'] = config.get('USSD', 'sms_responses', fallback=DEFAULT_CONFIG['sms_ussd_responses']).split(',')
# Email settings
cfg['email_to'] = config.get('Email', 'to', fallback='')
cfg['email_from'] = config.get('Email', 'from', fallback=f'sim-monitor@{socket.gethostname()}')
cfg['smtp_host'] = config.get('Email', 'smtp_host', fallback='smtp.gmail.com')
cfg['smtp_port'] = int(config.get('Email', 'smtp_port', fallback='587'))
cfg['smtp_user'] = config.get('Email', 'smtp_user', fallback='')
cfg['smtp_password'] = config.get('Email', 'smtp_password', fallback='')
# SMS settings
cfg['sms_to'] = config.get('SMS', 'to', fallback='')
return cfg
# Global config variable
CONFIG = None
# Setup logging
def setup_logging():
log_dir = Path(CONFIG['log_file']).parent
log_dir.mkdir(parents=True, exist_ok=True)
# Set log level based on CONFIG['debug_mode']
log_level = logging.DEBUG if CONFIG['debug_mode'] else logging.INFO
logging.basicConfig(
level=log_level,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(CONFIG['log_file']),
logging.StreamHandler()
]
)
class USSDSession:
"""Manages USSD communication with the modem"""
def __init__(self, modem_index):
self.modem_index = modem_index
def _run_mmcli(self, args, timeout=10):
"""Run mmcli command and return output"""
cmd = ['mmcli', '-m', str(self.modem_index)] + args
logging.debug(f"Executing mmcli command: {' '.join(cmd)}")
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
logging.error(f"Command timeout: {' '.join(cmd)}")
return None, "Timeout", 1
def cancel(self):
"""Cancel any active USSD session"""
logging.info("Attempting to cancel any active USSD session.")
self._run_mmcli(['--3gpp-ussd-cancel'], timeout=10) # Shorter timeout for cancel
def _get_ussd_status(self, output):
"""Extract USSD status from mmcli output."""
# Fixed regex to capture hyphens in status (e.g., 'user-response')
match = re.search(r'status:\s*([\w-]+)', output)
if match:
return match.group(1).strip()
return None
def _poll_for_response(self, timeout=10):
"""Polls mmcli --3gpp-ussd-status until status is user-response/terminated or timeout."""
start_time = time.time()
while time.time() - start_time < timeout:
logging.debug("Polling USSD status...")
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-status'], timeout=10)
if rc != 0:
# Check for critical modem error
if 'invalid modem index' in stderr.lower() or 'modem not found' in stderr.lower():
logging.critical(f"CRITICAL ERROR during polling: Modem not found or invalid index: {stderr.strip()}")
return 'MODEM_NOT_FOUND'
# Treat other rc != 0 as non-critical but stop polling
logging.warning(f"USSD status check failed (non-critical, rc={rc}): {stderr.strip()}. Stopping poll.")
return None
# Check the session status
status = self._get_ussd_status(stdout)
# Stop polling when the status changes to a terminal/response state
if status in ['user-response', 'terminated']:
logging.info(f"Poll stopping: Status changed to {status}. Attempting to extract response...")
# Extract the response based on new priority logic
response = self._extract_response(stdout)
if not response and status == 'user-response':
logging.warning("Status is 'user-response' but no network request text was found.")
return response # Returns the response or None if extraction failed
logging.debug(f"Status is '{status}'. Polling again in 1 second...")
time.sleep(1)
logging.warning("USSD polling timed out after 10 seconds.")
return None
def initiate(self, code):
"""Initiate USSD session"""
logging.info(f"Initiating USSD: {code}")
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-initiate', code])
# Log full output for debugging
if stdout:
logging.debug(f"STDOUT: {stdout}")
if stderr:
logging.debug(f"STDERR: {stderr}")
if rc != 0:
if 'invalid modem index' in stderr.lower() or 'modem not found' in stderr.lower():
logging.critical(f"CRITICAL ERROR: Modem not found or invalid index: {stderr}")
return 'MODEM_NOT_FOUND'
else:
logging.error(f"USSD initiate failed (non-critical): {stderr}")
return None
# --- NEW LOGIC: Check for immediate reply in initiate STDOUT, overriding the poll ---
immediate_reply_match = re.search(r"new reply from network:\s*'(.*?)'", stdout, re.DOTALL)
if immediate_reply_match:
reply = immediate_reply_match.group(1).strip()
# Clean up the response
cleaned_reply = ' '.join(reply.split())
if cleaned_reply:
logging.info(f"Immediate Reply Found from Initiate (new reply from network): {cleaned_reply[:50]}...")
return cleaned_reply
# --- END NEW LOGIC ---
# Add 1-second delay before first poll
logging.debug("Waiting 1 second for network response before starting poll.")
time.sleep(1)
# Start status polling immediately
logging.info("Starting USSD status polling...")
poll_result = self._poll_for_response()
return poll_result
def respond(self, response_code):
"""Respond to USSD prompt"""
logging.info(f"Responding: {response_code}")
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-respond', response_code])
# Log full output for debugging
if stdout:
logging.debug(f"STDOUT: {stdout}")
if stderr:
logging.debug(f"STDERR: {stderr}")
if rc != 0:
if 'invalid modem index' in stderr.lower() or 'modem not found' in stderr.lower():
logging.critical(f"CRITICAL ERROR: Modem not found or invalid index: {stderr}")
return 'MODEM_NOT_FOUND'
else:
logging.error(f"USSD respond failed (non-critical): {stderr}")
return None
# Increased wait time to 1 second before starting to poll
logging.debug("Waiting 1 second for network response before starting poll.")
time.sleep(1)
# Start status polling immediately
logging.info("Starting USSD status polling...")
poll_result = self._poll_for_response()
return poll_result
def _extract_response(self, output):
"""
Extract response text, prioritizing 'new reply from network' and falling back to
'network request' (user-response field) from the mmcli --3gpp-ussd-status output.
"""
if not output:
logging.warning("No output from mmcli command")
return None
logging.debug(f"Raw mmcli output: {output.strip()}")
# Define fields to check in priority order: 1. 'new reply from network:', 2. 'network request:'
fields_to_check = [
('new reply from network:', "New Reply"),
('network request:', "Network Request/User-Response")
]
lines = output.split('\n')
for field_name, log_name in fields_to_check:
if field_name not in output:
continue # Skip if the field isn't even in the output
response_lines = []
capturing_request = False
logging.debug(f"Attempting to parse multi-line '{field_name}' output.")
for line in lines:
line_strip = line.strip()
if field_name in line and not capturing_request:
# Found the start of the field
capturing_request = True
# Extract text after the field name on the initial line
parts = line.split(field_name, 1)
if len(parts) > 1 and parts[1].strip():
response_lines.append(parts[1].strip())
elif capturing_request:
# Check for continuation lines (indented and usually starting with '|')
if line.strip().startswith('|'):
# The text starts after the pipe and any preceding whitespace
text = line.split('|', 1)
if len(text) > 1:
response_lines.append(text[1].strip())
elif not line_strip and response_lines:
# Stop capturing on a blank line if we've already found content
break
elif not line.startswith(' '):
# Stop capturing if indentation breaks (e.g., hit another section header)
# This covers the case where a new section (like 'Properties:' or 'status:') begins
break
if response_lines:
response = ' '.join(response_lines)
# Clean up the response (remove excessive whitespace, newlines)
response = ' '.join(response.split())
# Check for emptiness after cleanup (e.g., if it only contained whitespace/newlines)
if response:
logging.info(f"Extracted response ({log_name}): {response[:100]}...")
return response
# If neither field contained non-empty content
logging.warning("Could not extract USSD response from 'new reply from network' or 'network request' fields.")
return None
class BalanceChecker:
"""Handles balance checking and data storage"""
def __init__(self):
self.data_file = Path(CONFIG['data_file'])
self.data_file.parent.mkdir(parents=True, exist_ok=True)
self.ussd = USSDSession(CONFIG['modem_index'])
def check_data_balance(self):
"""Perform USSD sequence to check data balance"""
logging.info("Starting data balance check")
# Step 1: Cancel any existing session
self.ussd.cancel()
time.sleep(2)
# Step 2: Initial USSD
response = self.ussd.initiate(CONFIG['ussd_init'])
if response == 'MODEM_NOT_FOUND':
return 'MODEM_NOT_FOUND'
if not response:
logging.error("No response from initial USSD (non-critical failure)")
self.ussd.cancel()
return None
logging.info(f"Data Response 1: {response[:50]}...")
# Step 3: Follow-up responses
for i, resp_code in enumerate(CONFIG['ussd_responses'], 2):
time.sleep(3)
response = self.ussd.respond(resp_code)
if response == 'MODEM_NOT_FOUND':
return 'MODEM_NOT_FOUND'
if not response:
logging.error(f"No response from data step {i} (non-critical failure)")
self.ussd.cancel()
return None
logging.info(f"Data Response {i}: {response[:50]}...")
# Step 4: Final cancellation
self.ussd.cancel()
return response
def check_sms_balance(self):
"""Perform USSD sequence to check SMS balance"""
logging.info("Starting SMS balance check")
# Step 1: Cancel any existing session
self.ussd.cancel()
time.sleep(2)
# Step 2: Initial USSD
response = self.ussd.initiate(CONFIG['sms_ussd_init'])
if response == 'MODEM_NOT_FOUND':
return 'MODEM_NOT_FOUND'
if not response:
logging.error("No response from initial SMS USSD (non-critical failure)")
self.ussd.cancel()
return None
logging.info(f"SMS Response 1: {response[:50]}...")
# Step 3: Follow-up responses
for i, resp_code in enumerate(CONFIG['sms_ussd_responses'], 2):
time.sleep(3)
response = self.ussd.respond(resp_code)
if response == 'MODEM_NOT_FOUND':
return 'MODEM_NOT_FOUND'
if not response:
logging.error(f"No response from SMS step {i} (non-critical failure)")
self.ussd.cancel()
return None
logging.info(f"SMS Response {i}: {response[:50]}...")
# Step 4: Final cancellation
self.ussd.cancel()
return response
def parse_data_balance(self, response):
"""
Parse data balance and expiry from response using multiple patterns for robustness.
We attempt to convert all balances to MB for storage consistency.
"""
# --- Pattern 1: Specific (Mo/Go/MB/GB + jusqu'au + time) ---
# e.g., "6676 Mo valables jusqu'au 10/11/2025 à 17:51:29"
pattern_1 = r'(\d+\.?\d*)\s*(Mo|Go|MB|GB).*?jusqu\'au\s*(\d{2}/\d{2}/\d{2,4})\s*à\s*(\d{2}:\d{2}:\d{2})'
match = re.search(pattern_1, response, re.IGNORECASE | re.MULTILINE)
if match:
balance_val = float(match.group(1))
balance_unit = match.group(2).upper()
expiry_date = match.group(3)
expiry_time = match.group(4)
# Convert balance to MB/GB
balance_mb = int(balance_val * 1024) if balance_unit.startswith('G') else int(balance_val)
balance_gb = round(balance_mb / 1024, 2)
logging.info(f"Data Balance found (P1 - jusqu'au): {balance_mb} MB ({balance_gb} GB), Expires: {expiry_date} {expiry_time}")
return {
'balance_mb': balance_mb,
'balance_gb': balance_gb,
'expiry_date': expiry_date,
'expiry_time': expiry_time
}
# --- Pattern 2: Generic (MB/GB/Mo/Go + expire le/valable jusqu'au) ---
# e.g., "10.5 GB data bundle which expire le 05/03/2026"
pattern_2 = r'(\d+\.?\d*)\s*(MB|GB|Mo|Go).*?(expire\s*le|valable\s*jusqu\'au|valables\s*jusqu\'au)\s*(\d{2}/\d{2}/\d{2,4})'
match = re.search(pattern_2, response, re.IGNORECASE | re.MULTILINE)
if match:
balance_val = float(match.group(1))
balance_unit = match.group(2).upper()
expiry_date = match.group(4)
expiry_time = 'N/A' # Time not guaranteed by this pattern
# Convert balance to MB/GB
balance_mb = int(balance_val * 1024) if balance_unit.startswith('G') else int(balance_val)
balance_gb = round(balance_mb / 1024, 2)
logging.info(f"Data Balance found (P2 - expire le/jusqu'au): {balance_mb} MB ({balance_gb} GB), Expires: {expiry_date} {expiry_time}")
return {
'balance_mb': balance_mb,
'balance_gb': balance_gb,
'expiry_date': expiry_date,
'expiry_time': 'N/A'
}
# --- Pattern 3: Fallback (Just the Balance Value - No Expiry) ---
# e.g., "Your remaining balance is 4096 MB."
pattern_3 = r'(\d+\.?\d*)\s*(MB|GB|Mo|Go)'
match = re.search(pattern_3, response, re.IGNORECASE)
if match:
balance_val = float(match.group(1))
balance_unit = match.group(2).upper()
# Convert balance to MB/GB
balance_mb = int(balance_val * 1024) if balance_unit.startswith('G') else int(balance_val)
balance_gb = round(balance_mb / 1024, 2)
logging.warning(f"Data Balance found (P3 - Fallback): {balance_mb} MB ({balance_gb} GB). NO EXPIRY DATE FOUND.")
return {
'balance_mb': balance_mb,
'balance_gb': balance_gb,
'expiry_date': 'N/A',
'expiry_time': 'N/A'
}
logging.error(f"Could not parse data balance from: {response}")
return None
def parse_sms_balance(self, response):
"""Parse SMS balance and expiry from response"""
# Pattern: "935 SMS, valable jusqu'au 10/11/2025 17:51:29"
pattern = r'(\d+)\s*SMS.*?valable\s*jusqu\'au\s*(\d{2}/\d{2}/\d{4})'
match = re.search(pattern, response, re.IGNORECASE)
if match:
sms_count = int(match.group(1))
expiry_date = match.group(2)
logging.info(f"SMS Balance: {sms_count} SMS, Expires: {expiry_date}")
return {
'sms_count': sms_count,
'expiry_date': expiry_date,
'expiry_time': 'N/A' # Not reliably available in this pattern
}
# Fallback: Just the SMS count
pattern_fallback = r'(\d+)\s*SMS'
match = re.search(pattern_fallback, response, re.IGNORECASE)
if match:
sms_count = int(match.group(1))
logging.warning(f"SMS Balance found (Fallback): {sms_count} SMS. NO EXPIRY DATE FOUND.")
return {
'sms_count': sms_count,
'expiry_date': 'N/A',
'expiry_time': 'N/A'
}
logging.error(f"Could not parse SMS balance from: {response}")
return None
def save_balance(self, data_info, sms_info):
"""Save balance data to CSV"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Ensure 'ERROR' or 'N/A' is used for missing data fields
def get_value(info, key, default='ERROR'):
return str(info.get(key, default)) if info else default
data_mb = get_value(data_info, 'balance_mb')
data_gb = get_value(data_info, 'balance_gb')
data_expiry_date = get_value(data_info, 'expiry_date')
data_expiry_time = get_value(data_info, 'expiry_time')
sms_count = get_value(sms_info, 'sms_count')
sms_expiry_date = get_value(sms_info, 'expiry_date')
sms_expiry_time = get_value(sms_info, 'expiry_time')
# Only save if at least one check succeeded
if data_mb != 'ERROR' or sms_count != 'ERROR':
try:
with open(self.data_file, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
timestamp,
data_mb,
data_gb,
data_expiry_date,
data_expiry_time,
sms_count,
sms_expiry_date,
sms_expiry_time
])
logging.info(f"Balance data saved to {self.data_file}")
except Exception as e:
logging.error(f"Failed to write to data file: {e}")
else:
logging.warning("No valid data or SMS info to save.")
def run(self):
"""
Run complete balance check.
Returns True if a CRITICAL 'MODEM_NOT_FOUND' error occurred, False otherwise.
"""
# --- Data Balance Check ---
data_response = self.check_data_balance()
if data_response == 'MODEM_NOT_FOUND':
return True # Signal Critical Failure
data_info = self.parse_data_balance(data_response) if data_response else None
# Wait a bit between checks
time.sleep(5)
# --- SMS Balance Check ---
sms_response = self.check_sms_balance()
if sms_response == 'MODEM_NOT_FOUND':
return True # Signal Critical Failure
sms_info = self.parse_sms_balance(sms_response) if sms_response else None
# --- Save Results ---
self.save_balance(data_info, sms_info)
# If we reached here, there was NO CRITICAL modem error.
return False # Signal Not Critical Failure
class ReportGenerator:
"""Generates reports from balance data"""
def __init__(self):
self.data_file = Path(CONFIG['data_file'])
def get_weekly_data(self):
"""Get balance data from last 7 days"""
if not self.data_file.exists():
return []
cutoff = datetime.now() - timedelta(days=7)
cutoff_str = cutoff.strftime('%Y-%m-%d')
data = []
with open(self.data_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
# Assuming the first column is the timestamp
if row and row[0] >= cutoff_str:
data.append(row)
return data
def calculate_daily_usage(self, data):
"""Calculate average daily data usage"""
prev_balance = None
total_usage = 0
usage_days = 0
for row in data:
# Data MB is column 1 (index 1)
if len(row) > 1 and row[1] not in ['ERROR', 'N/A']:
try:
balance = float(row[1]) # Use float for MB balance
if prev_balance is not None:
daily_usage = prev_balance - balance
# Only count positive usage
if daily_usage >= 0:
total_usage += daily_usage
usage_days += 1
prev_balance = balance
except ValueError:
logging.debug(f"Skipping non-numeric data balance value: {row[1]}")
continue
return int(total_usage / usage_days) if usage_days > 0 else 0
def calculate_daily_sms_usage(self, data):
"""Calculate average daily SMS usage"""
prev_balance = None
total_usage = 0
usage_days = 0
for row in data:
# SMS count is column 5 (index 5)
if len(row) > 5 and row[5] not in ['ERROR', 'N/A']:
try:
balance = int(row[5])
if prev_balance is not None:
daily_usage = prev_balance - balance
if daily_usage >= 0:
total_usage += daily_usage
usage_days += 1
prev_balance = balance
except ValueError:
logging.debug(f"Skipping non-integer SMS balance value: {row[5]}")
continue
return total_usage // usage_days if usage_days > 0 else 0
def generate_email_report(self, data):
"""Generate detailed email report"""
report = "SIM Card Data Balance Report\n"
report += "=" * 80 + "\n\n"
report += "Report Period: Last 7 Days\n"
report += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
if not data:
report += "No data collected during this period.\n"
return report
report += "Daily Balance History:\n"
report += "-" * 80 + "\n"
report += f"{'Date':<19} | {'Data Balance (MB)':<19} | {'SMS Count':<12} | {'Expiry Date'}\n"
report += "-" * 80 + "\n"
for row in data:
# CSV structure: [0]Timestamp, [1]MB, [2]GB, [3]D_Date, [4]D_Time, [5]SMS_Cnt, [6]S_Date, [7]S_Time
if len(row) >= 8:
timestamp = row[0]
balance_mb = row[1]
balance_gb = row[2]
data_expiry_date = row[3]
data_expiry_time = row[4]
sms_count = row[5]
else:
# Handle incomplete rows gracefully (shouldn't happen with the new save logic)
continue
data_str = f"{balance_mb} ({balance_gb}GB)" if balance_mb not in ['ERROR', 'N/A'] else 'ERROR'
sms_str = f"{sms_count}" if sms_count not in ['ERROR', 'N/A'] else 'N/A'
expiry_str = f"{data_expiry_date} {data_expiry_time}" if data_expiry_date != 'ERROR' else 'ERROR'
report += f"{timestamp:<19} | {data_str:<19} | {sms_str:<12} | {expiry_str}\n"
report += "-" * 80 + "\n\n"
# Statistics
valid_data_balances = [float(row[1]) for row in data if len(row) > 1 and row[1] not in ['ERROR', 'N/A']]
valid_sms_balances = [int(row[5]) for row in data if len(row) > 5 and row[5] not in ['ERROR', 'N/A']]
if valid_data_balances or valid_sms_balances:
report += "Summary Statistics:\n"
if valid_data_balances:
avg_data = sum(valid_data_balances) / len(valid_data_balances)
min_data = min(valid_data_balances)
# Round to 2 decimal places for better readability
latest_data = round(valid_data_balances[-1], 2)
min_data = round(min_data, 2)
avg_usage = self.calculate_daily_usage(data)
report += f" Data - Latest Balance: {latest_data} MB\n"
report += f" Data - Minimum Balance: {min_data} MB\n"
report += f" Data - Average Daily Usage: {avg_usage} MB/day\n"
if valid_sms_balances:
min_sms = min(valid_sms_balances)
report += f" SMS - Latest Balance: {valid_sms_balances[-1]} SMS\n"
report += f" SMS - Minimum Balance: {min_sms} SMS\n"
report += f" SMS - Average Daily Usage: {self.calculate_daily_sms_usage(data)} SMS/day\n"
return report
def generate_sms_report(self, data):
"""Generate compact SMS report"""
if not data:
return None
# Get latest data point
latest_row = data[-1]
if len(latest_row) < 8:
return None # Data is too incomplete
latest_data_mb = latest_row[1]
latest_sms_count = latest_row[5]
data_expiry_date = latest_row[3]
sms = f"SIM Wk Sum: "
if latest_data_mb not in ['ERROR', 'N/A']:
avg_usage = self.calculate_daily_usage(data)
# Format MB to KB for shorter SMS if value is high
data_val = f"{int(float(latest_data_mb))}MB"
if float(latest_data_mb) > 10000: # ~10GB
data_val = f"{float(latest_data_mb)/1024:.1f}GB"
sms += f"Data: {data_val} ({avg_usage}M/d)"
if data_expiry_date not in ['ERROR', 'N/A']:
expiry_compact = '/'.join(data_expiry_date.split('/')[:2])
sms += f" Exp:{expiry_compact}"
if latest_sms_count not in ['ERROR', 'N/A']:
sms += f" SMS: {latest_sms_count}"
if len(sms) > 160:
sms = sms[:157] + '...' # Truncate if necessary
return sms.strip()
class EmailSender:
"""Handles email sending via SMTP"""
def check_network(self):
"""Check if network is available"""
# Checks connection to Google and Cloudflare DNS servers
for host in ['8.8.8.8', '1.1.1.1']:
try:
socket.create_connection((host, 53), timeout=5)
return True
except OSError:
continue
return False
def send_email(self, subject, body):
"""Send email using SMTP"""
if not CONFIG['email_to']:
logging.warning("Email recipient not configured. Skipping email report.")
return False
if not self.check_network():
logging.warning("No network connectivity for email. Report pending.")
return False
try:
msg = MIMEMultipart()
msg['From'] = CONFIG['email_from']
msg['To'] = CONFIG['email_to']
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(CONFIG['smtp_host'], CONFIG['smtp_port'], timeout=30) as server:
server.starttls()
server.login(CONFIG['smtp_user'], CONFIG['smtp_password'])
server.send_message(msg)
logging.info("Email sent successfully")
return True
except Exception as e:
logging.error(f"Failed to send email: {e}")
return False
class SMSSender:
"""Handles SMS sending via mmcli"""
def __init__(self):
self.modem_index = CONFIG['modem_index']
def send_sms(self, message):
"""Send SMS using mmcli"""
if not CONFIG['sms_to']:
logging.warning("SMS recipient not configured. Skipping SMS report.")
return False
logging.info(f"Sending SMS ({len(message)} chars) to {CONFIG['sms_to']}")
# Step 1: Create the SMS
# Note: Use proper quoting for text with spaces
create_cmd = [
'mmcli', '-m', str(self.modem_index),
'--messaging-create-sms',
f'text="{message}",number="{CONFIG["sms_to"]}"'
]
try:
result = subprocess.run(
create_cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
logging.error(f"Failed to create SMS: {result.stderr.strip()}")
return False
# Extract SMS index from output
match = re.search(r'/SMS/(\d+)', result.stdout)
if not match:
logging.error(f"Could not extract SMS index from output: {result.stdout.strip()}")
return False
sms_index = match.group(1)
logging.debug(f"Created SMS with index: {sms_index}")
# Step 2: Send the SMS
send_cmd = [
'mmcli', '-s', sms_index,
'--send'
]
result = subprocess.run(
send_cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logging.info(f"SMS sent successfully")
return True
else:
logging.error(f"Failed to send SMS: {result.stderr.strip()}")
return False
except Exception as e:
logging.error(f"SMS send error: {e}")
return False
class WeeklyReporter:
"""Manages weekly email and SMS reports"""
def __init__(self):
self.report_gen = ReportGenerator()
self.email_sender = EmailSender()
self.sms_sender = SMSSender()
self.pending_file = Path(CONFIG['pending_file'])
def send_reports(self):
"""Send both email and SMS reports"""
logging.info("Starting weekly reports")
# Get current week data
current_data = self.report_gen.get_weekly_data()
# Load pending data if exists
pending_data = []
if self.pending_file.exists():
logging.info("Found pending data from previous week(s)")
try:
with open(self.pending_file, 'r') as f:
reader = csv.reader(f)
pending_data = list(reader)
except Exception as e:
logging.error(f"Failed to load pending file: {e}")
# Combine data
all_data = pending_data + current_data
if not all_data:
logging.info("No data to report")
return
# Get hostname
hostname = socket.gethostname()
# Generate reports
email_body = self.report_gen.generate_email_report(all_data)
sms_text = self.report_gen.generate_sms_report(current_data)
# Send email and check for success
subject = f"Weekly SIM Balance Report - {hostname} {datetime.now().strftime('%Y-%m-%d')}"
email_success = self.email_sender.send_email(subject, email_body)
# Send SMS (fire-and-forget, but log success)
sms_success = True
if sms_text:
sms_success = self.sms_sender.send_sms(sms_text)
if email_success:
# Clear pending data on email success
if self.pending_file.exists():
self.pending_file.unlink()
logging.debug("Cleared pending data file.")
# Archive old data (keep 60 days)
self.archive_old_data()
else:
# Save all data (current + pending) for next week if email failed
logging.info("Email report failed. Saving all aggregated data for next week.")
try:
with open(self.pending_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerows(all_data)
logging.info(f"Saved {len(all_data)} entries to pending file.")
except Exception as e:
logging.error(f"Failed to save pending file: {e}")
def archive_old_data(self):
"""Keep only last 60 days of data"""
data_file = Path(CONFIG['data_file'])
if not data_file.exists():
return
cutoff = datetime.now() - timedelta(days=60)
cutoff_str = cutoff.strftime('%Y-%m-%d')
temp_file = data_file.with_suffix('.tmp')
try:
with open(data_file, 'r') as fin, open(temp_file, 'w', newline='') as fout:
reader = csv.reader(fin)
writer = csv.writer(fout)
rows_kept = 0
for row in reader:
if row and row[0] >= cutoff_str:
writer.writerow(row)
rows_kept += 1
temp_file.replace(data_file)
logging.info(f"Archived old data. {rows_kept} rows kept.")
except Exception as e:
logging.error(f"Archiving failed: {e}")
def main():
global CONFIG
parser = argparse.ArgumentParser(description='SIM Balance Monitor')
parser.add_argument('action', choices=['check', 'report'],
help='Action to perform: check (daily balance) or report (weekly summary)')
parser.add_argument('-c', '--config', default=DEFAULT_CONFIG_FILE,
help=f'Configuration file path (default: {DEFAULT_CONFIG_FILE})')
args = parser.parse_args()
# Load configuration FIRST
CONFIG = load_config(args.config)
# THEN setup logging (needs CONFIG to be loaded)
setup_logging()
logging.info(f"--- Script started: Action '{args.action}' ---")
try:
if args.action == 'check':
checker = BalanceChecker()
# is_critical_failure is True only if 'MODEM_NOT_FOUND' was detected
is_critical_failure = checker.run()
if is_critical_failure:
logging.critical("--- Check completed with CRITICAL Modem Error. Exiting 1. ---")
sys.exit(1)
else:
# All other results (success, USSD failure, timeout, parsing error) exit 0 (fail silently)
logging.info("--- Check completed (Modem OK). Exiting 0. ---")
sys.exit(0)
elif args.action == 'report':
reporter = WeeklyReporter()
reporter.send_reports()
logging.info("--- Report completed ---")
sys.exit(0)
except Exception as e:
logging.critical(f"Unexpected CRITICAL system error: {e}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()
sudo chmod 755 /usr/local/bin/sim-balance-monitor.py
sudo chown root:root /usr/local/bin/sim-balance-monitor.py
sudo nano /etc/sim-balance-monitor.conf
[General]
# Modem index (check with: mmcli -L)
modem_index = 0
debug_mode = false
# Data file paths
data_file = /var/log/sim-balance/balance-data.csv
pending_file = /var/log/sim-balance/pending-report.txt
log_file = /var/log/sim-balance/monitor.log
[USSD]
# Data balance check USSD codes
data_init = #1234#
data_responses = 2,3,1
# SMS balance check USSD codes
sms_init = #123#
sms_responses = 0,3
[Email]
# Email settings for reports
to = your-email@example.com
from = sim-monitor@your-server.com
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = your-email@gmail.com
smtp_password = your-app-password-here
[SMS]
# Phone number for SMS reports (with country code)
to = +221XXXXXXXXX
[Prometheus]
# Enable/disable Prometheus metrics (default: true)
enabled = true
# VictoriaMetrics or Prometheus Pushgateway URL
pushgateway = http://192.168.100.1:8428/api/v1/import/prometheus
# Job name for metrics
job_name = sim_balance_monitor
sudo chmod 600 /etc/sim-balance-monitor.conf
sudo chown root:root /etc/sim-balance-monitor.conf
sudo mkdir -p /var/log/sim-balance
sudo chmod 755 /var/log/sim-balance
Need a wrapper script to retry the script a couple times, and then reboot if no modem found (which happens sometimes). If after a couple reboots, still no modem found, then disable the script.
sudo nano /usr/local/bin/sim-balance-wrapper.sh
#!/bin/bash
# Configuration
SCRIPT_PATH="/usr/local/bin/sim-balance-monitor.py" # <-- UPDATE THIS PATH
COUNTER_FILE="/home/user/fail_count" # Persistent failure counter path
SERVICE_NAME="sim-balance-check.service"
# --- Setup Persistent Counter Directory ---
# Note: For /home/user/fail_count, ensure the user running the service (root or user) has permissions.
mkdir -p "$(dirname "$COUNTER_FILE")"
if [ ! -f "$COUNTER_FILE" ]; then
echo 0 > "$COUNTER_FILE"
fi
# --- Function to run the script and handle the immediate retry ---
run_check() {
echo "Running balance check attempt 1..."
"$SCRIPT_PATH" check
local EXIT_CODE=$?
if [ $EXIT_CODE -ne 0 ]; then
echo "Check failed (exit $EXIT_CODE). Retrying immediately in 5 minutes..."
# 5 minutes = 300 seconds
sleep 300 # <-- CHANGED TO 5 MINUTES (300 seconds)
"$SCRIPT_PATH" check
EXIT_CODE=$?
echo "Retry finished with exit $EXIT_CODE."
fi
return $EXIT_CODE
}
# --- Execute the check with retry logic ---
run_check
FINAL_EXIT_CODE=$?
# --- Handle Final Exit Code ---
if [ $FINAL_EXIT_CODE -eq 0 ]; then
# SUCCESS: Reset counter
echo "Check successful. Resetting failure counter."
echo 0 > "$COUNTER_FILE"
exit 0
else
# CRITICAL FAILURE (exit code 1): Increment and check counter
CURRENT_COUNT=$(cat "$COUNTER_FILE")
NEXT_COUNT=$((CURRENT_COUNT + 1))
echo "Critical failure detected (exit $FINAL_EXIT_CODE). Current count: $CURRENT_COUNT. New count:>
echo "$NEXT_COUNT" > "$COUNTER_FILE"
if [ $NEXT_COUNT -eq 2 ]; then
# Second consecutive final failure: REBOOT
echo "Second consecutive critical failure. Rebooting system now..."
/sbin/reboot
elif [ $NEXT_COUNT -ge 3 ]; then
# Third consecutive final failure: DISABLE SERVICE
echo "Third consecutive critical failure. Disabling service and preventing future runs."
/usr/bin/systemctl disable --now "$SERVICE_NAME"
echo "Service $SERVICE_NAME is now disabled. Manual intervention is required."
# Keep the process running long enough for systemd to log the disable command
sleep 2
fi
exit $FINAL_EXIT_CODE
fi
sudo chmod +x /usr/local/bin/sim-balance-wrapper.sh
sudo nano /etc/systemd/system/sim-balance-check.service
[Unit]
Description=Daily SIM Balance Check and Critical Failure Handler
Wants=network-online.target
After=network-online.target
[Service]
Type=oneshot
User=root
# Set the working directory for the script to find its config/data files if needed
WorkingDirectory=/usr/local/bin
ExecStart=/usr/local/bin/sim-balance-wrapper.sh
# Do NOT use Restart=, as the wrapper handles all retry/reboot logic.
[Install]
WantedBy=multi-user.target
sudo nano /etc/systemd/system/sim-balance-check.timer
[Unit]
Description=Run SIM Balance Check Daily
[Timer]
# Run the service daily at 6:00 AM
OnCalendar=*-*-* 06:00:00
# Persistent is set to false, meaning it won't run a missed job on reboot.
Persistent=false
[Install]
WantedBy=timers.target
sudo nano /etc/systemd/system/sim-balance-report.service
[Unit]
Description=SIM Balance Weekly Report
After=network.target ModemManager.service
Requires=ModemManager.service
[Service]
Type=oneshot
ExecStart=/usr/local/bin/sim-balance-monitor.py report
User=root
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
sudo nano /etc/systemd/system/sim-balance-report.timer
[Unit]
Description=SIM Balance Weekly Report Timer
Requires=sim-balance-report.service
[Timer]
# Run every Monday at 10:00 AM
OnCalendar=Mon *-*-* 06:02:00
Persistent=false
[Install]
WantedBy=timers.target
sudo systemctl daemon-reload
sudo systemctl enable sim-balance-check.timer
sudo systemctl enable sim-balance-report.timer
For testing, it’s nice to be able to send USSD commands. This is possible using mmcli, but a little cumbersome. I asked Claude.ai to make me a python script to make it interactive, and it came up with the following, which works pretty well after a bit of debugging.
sudo apt install python3-dbus
sudo nano ~/ussd_modem.py
#!/usr/bin/env python3
"""
Improved USSD ModemManager Script
- Separates Logic (USSDClient) from UI (main)
- Validates Modem Capabilities
- Uses Type Hinting for clarity
- Robust Error Handling
- FIXED: Added explicit DBus type casting and pre-response state validation to ensure replies are correctly sent and received by the network.
- FIXED: Introduced a mandatory 0.5s pause after the DBus command if no immediate response is returned, to prevent race conditions and ensure the modem state updates correctly before polling begins.
- FIXED: The state stabilization poll now defaults to strictly waiting for STATE_USER_RESPONSE (3) to reduce ambiguity. STATE_IDLE (1) is caught immediately as a termination signal.
- FIXED: Removed redundant cancellation call inside the main loop to prevent double cancellation errors. The cleanup is now handled exclusively by the 'finally' block.
- UPDATED: The immediate exit on STATE_IDLE within _wait_for_stable_state has been removed, allowing the session to wait up to 30 seconds for a target state even if IDLE is seen, as requested. Note: This will slow down termination if the network closes the session.
- NEW: Added verbose debugging print statements to trace function calls and modem data exchange.
"""
import sys
import time
import dbus
from typing import Optional, Tuple, Dict, Any
# DBUS Constants
MM_BUS_NAME = 'org.freedesktop.ModemManager1'
MM_OBJ_PATH = '/org/freedesktop/ModemManager1'
MM_IFACE_OBJ_MANAGER = 'org.freedesktop.DBus.ObjectManager'
MM_IFACE_MODEM = 'org.freedesktop.ModemManager1.Modem'
MM_IFACE_3GPP_USSD = 'org.freedesktop.ModemManager1.Modem.Modem3gpp.Ussd'
DBUS_PROPS_IFACE = 'org.freedesktop.DBus.Properties'
# USSD State Codes
STATE_UNKNOWN = 0
STATE_IDLE = 1
STATE_ACTIVE = 2
STATE_USER_RESPONSE = 3
class USSDClient:
"""
Handles the low-level DBus communication with ModemManager for USSD.
"""
def __init__(self):
self.bus = dbus.SystemBus()
self.modem_path: Optional[str] = None
self.modem_3gpp_ussd = None
self.modem_props = None
self.debug_enabled = True # Control debugging output
def _debug(self, message: str):
"""Helper for conditional debugging output."""
if self.debug_enabled:
print(f"[DEBUG] {message}", file=sys.stderr)
def connect(self) -> bool:
"""
Finds a modem that supports 3GPP USSD and connects to its interfaces.
Returns True if successful.
"""
self._debug("Attempting to connect to ModemManager...")
try:
manager = self.bus.get_object(MM_BUS_NAME, MM_OBJ_PATH)
manager_iface = dbus.Interface(manager, MM_IFACE_OBJ_MANAGER)
objects = manager_iface.GetManagedObjects()
for path, interfaces in objects.items():
# Check if this object is a Modem AND supports USSD
if MM_IFACE_MODEM in interfaces and MM_IFACE_3GPP_USSD in interfaces:
self.modem_path = path
self._debug(f"Modem found at path: {path}")
modem_obj = self.bus.get_object(MM_BUS_NAME, self.modem_path)
self.modem_3gpp_ussd = dbus.Interface(modem_obj, MM_IFACE_3GPP_USSD)
self.modem_props = dbus.Interface(modem_obj, DBUS_PROPS_IFACE)
return True
self._debug("No USSD-capable modem found.")
return False
except dbus.exceptions.DBusException as e:
self._debug(f"DBus Error during connection: {e}")
raise ConnectionError(f"DBus Error during connection: {e}")
def get_state(self) -> Tuple[int, str]:
"""
Get current USSD session state.
Returns: (state_code, state_name)
"""
try:
state = self.modem_props.Get(MM_IFACE_3GPP_USSD, 'State')
states = {
STATE_UNKNOWN: 'Unknown',
STATE_IDLE: 'Idle',
STATE_ACTIVE: 'Active',
STATE_USER_RESPONSE: 'User-Response'
}
state_name = states.get(state, 'Unknown')
self._debug(f"Current modem state: {state_name} ({state})")
return state, state_name
except dbus.exceptions.DBusException:
self._debug("Error reading modem state. Likely disconnected or interface unavailable.")
return STATE_UNKNOWN, 'Unknown'
def _read_network_request(self) -> Optional[str]:
"""Reads the NetworkRequest property (the message from the network)."""
try:
request = self.modem_props.Get(MM_IFACE_3GPP_USSD, 'NetworkRequest')
self._debug(f"Reading NetworkRequest property: '{request}'")
return request
except dbus.exceptions.DBusException:
self._debug("Error reading NetworkRequest property.")
return None
def _wait_for_stable_state(self, timeout: float = 30.0, target_states: Tuple[int, ...] = (STATE_USER_RESPONSE,)) -> int:
"""
Polls until the state settles to one of the target states.
Note: STATE_IDLE is now treated like any other non-target state during the poll.
If the session terminates and goes to IDLE, the poll will now run for the full timeout.
"""
target_names = [s for c, s in {3: 'User-Response'}.items() if c in target_states]
self._debug(f"Starting state stabilization poll (Timeout: {timeout}s, Target: {target_names})...")
start = time.time()
last_known_state = STATE_UNKNOWN
while (time.time() - start) < timeout:
state, state_name = self.get_state()
last_known_state = state
# The check for immediate IDLE exit has been removed as requested.
if state in target_states:
self._debug(f"State stabilized to: {state_name} ({state}) in {time.time() - start:.2f}s.")
return state
time.sleep(0.1)
# If the loop times out, raise an error, but if the last state was IDLE,
# return it to signal session closure gracefully.
if last_known_state == STATE_IDLE:
self._debug("Timed out, but last observed state was IDLE (Session ended).")
return STATE_IDLE
raise TimeoutError("Timed out waiting for network response.")
def initiate_session(self, code: str) -> Tuple[str, int]:
"""
Starts a USSD session.
Returns the network response string and the final state code.
"""
if not self.modem_3gpp_ussd:
raise RuntimeError("Client not connected to a modem.")
# Ensure code is a DBus String
dbus_code = dbus.String(code)
self._debug(f"Calling Initiate() with USSD code: {dbus_code} (DBus type: {type(dbus_code)})")
# NOTE: ModemManager will try to return the response string if it's immediately available.
response = self.modem_3gpp_ussd.Initiate(dbus_code, timeout=60)
final_state = STATE_UNKNOWN
self._debug(f"Initiate() returned immediately with: '{response}'")
if not response:
# Mandatory delay before polling begins (reverted to 0.5s)
self._debug("No immediate response. Introducing 0.5s state stabilization delay...")
time.sleep(0.5)
# Wait for state change and read message from property. Uses default target (STATE_USER_RESPONSE)
final_state = self._wait_for_stable_state()
response = self._read_network_request()
else:
# If response is returned directly, get the resulting state immediately
final_state, _ = self.get_state()
return response if response else "", final_state
def respond(self, reply_text: str) -> Tuple[str, int]:
"""
Sends a reply to an active menu.
Returns the next network response string and the final state code.
"""
self._debug("Starting Respond sequence...")
# 1. Pre-Check for stability: Ensure we are still in USER_RESPONSE state before sending.
# We explicitly poll for both IDLE and USER_RESPONSE here to distinguish between
# 'Ready to respond' (USER_RESPONSE) and 'Session terminated' (IDLE).
current_state = self._wait_for_stable_state(timeout=5.0, target_states=(STATE_USER_RESPONSE, STATE_IDLE))
if current_state == STATE_IDLE:
raise BrokenPipeError("Session unexpectedly closed by network before response could be sent.")
if current_state != STATE_USER_RESPONSE:
raise BrokenPipeError(f"Modem not ready for response. State: {current_state}")
# 2. Execute Response
# Ensure reply_text is a DBus String
dbus_reply = dbus.String(reply_text)
self._debug(f"Calling Respond() with reply: '{dbus_reply}' (DBus type: {type(dbus_reply)})")
# NOTE: ModemManager will try to return the response string if it's immediately available.
response = self.modem_3gpp_ussd.Respond(dbus_reply, timeout=60)
final_state = STATE_UNKNOWN
self._debug(f"Respond() returned immediately with: '{response}'")
if not response:
# Mandatory delay before polling begins (reverted to 0.5s)
self._debug("No immediate response. Introducing 0.5s state stabilization delay...")
time.sleep(0.5)
# Wait for state change and read message from property. Uses default target (STATE_USER_RESPONSE)
final_state = self._wait_for_stable_state()
response = self._read_network_request()
else:
# If response is returned directly, get the resulting state immediately
final_state, _ = self.get_state()
return response if response else "", final_state
def cancel(self):
"""Cancels any active session."""
self._debug("Attempting to cancel active USSD session...")
if self.modem_3gpp_ussd:
try:
self.modem_3gpp_ussd.Cancel()
self._debug("USSD session cancelled successfully.")
except dbus.exceptions.DBusException as e:
# The ModemManager API returns an error if you try to cancel when it's already idle.
# We ignore this expected error, but log it for debugging transparency.
self._debug(f"Cancel failed (likely already idle or internal protocol error): {e}")
pass
def main():
client = USSDClient()
# 1. Connection Check (Exits on failure)
print("--- Initializing Modem ---")
try:
if not client.connect():
print("Error: No modem found that supports USSD.")
sys.exit(1)
print(f"Connected to modem: {client.modem_path}")
except Exception as e:
print(f"Connection failed: {e}")
sys.exit(1)
# 2. Main Interactive Session (Handles all exceptions/interrupts for cleanup)
try:
# Initial Request
code = input("\nEnter USSD code (e.g. *123#): ").strip()
if not code:
return # Exit cleanly
print(f"Sending '{code}'...")
response, current_state = client.initiate_session(code)
print("\n" + "="*40)
print(f"NETWORK RESPONSE:\n{response}")
print("="*40 + "\n")
# Interaction Loop
while True:
# Check the state immediately after the last action
if current_state == STATE_IDLE:
print("Session ended.")
break
if current_state != STATE_USER_RESPONSE:
print(f"Warning: Modem state is {current_state} (not 'User-Response'). Exiting loop.")
# No client.cancel() here! Let the finally block handle cleanup.
break
# State is STATE_USER_RESPONSE (3), prompt for reply
user_input = input("Reply (or 'q' to quit): ").strip()
if user_input.lower() in ['q', 'quit', 'exit']:
# FIXED: Removed explicit client.cancel() here.
print("Cancelled.")
break
if not user_input:
continue
print("Sending reply...")
try:
# Respond and get the new response and resulting state
response, current_state = client.respond(user_input)
print("\n" + "="*40)
print(f"NETWORK RESPONSE:\n{response}")
print("="*40 + "\n")
except BrokenPipeError as e:
print(f"Error: {e}")
current_state = STATE_IDLE # Force break
continue # Go to top of loop to hit the break condition
except TimeoutError as e:
print(f"Error: {e}")
# The _wait_for_stable_state is designed to return STATE_IDLE on timeout if
# that was the last observed state, which handles termination gracefully.
current_state = STATE_IDLE
continue # Go to top of loop to hit the break condition
except KeyboardInterrupt:
print("\nInterrupted by user.")
except Exception as e:
# Catches DBus errors and others
print(f"\nFatal Error: {e}")
finally:
# 3. Cleanup: This is the single, guaranteed place to call cancel()
client.cancel()
if __name__ == "__main__":
main()
Nice to have cell signal strength metrics also.
sudo nano /usr/local/bin/modem-to-vmetrics.sh
#!/bin/bash
# ModemManager to Victoria Metrics polling script
# Polls mmcli every 5 seconds and sends metrics to Victoria Metrics
VMETRICS_URL="http://localhost:8428/api/v1/import/prometheus"
INTERVAL=5
# Find first available modem
get_modem_id() {
mmcli -L 2>/dev/null | grep -oP '/Modem/\K\d+' | head -1
}
# Wait for ModemManager to be ready and modem to be available
echo "Waiting for modem to be available..."
while true; do
MODEM_ID=$(get_modem_id)
if [ -n "$MODEM_ID" ]; then
echo "Found modem at index: $MODEM_ID"
break
fi
sleep 2
done
# Main polling loop
echo "Starting modem signal monitoring (every ${INTERVAL}s)..."
while true; do
# Get all modem info at once
MODEM_INFO=$(mmcli -m "$MODEM_ID" --output-keyvalue 2>/dev/null)
# Get signal quality
SIGNAL=$(echo "$MODEM_INFO" | \
grep 'modem.generic.signal-quality.value' | \
cut -d: -f2 | \
tr -d ' ')
# Get access technology
TECH=$(echo "$MODEM_INFO" | \
grep 'modem.generic.access-technologies.value\[1\]' | \
cut -d: -f2 | \
tr -d ' ')
if [ -n "$SIGNAL" ]; then
# Prepare metrics
METRICS="modem_signal_quality{modem_id=\"$MODEM_ID\"} $SIGNAL"
# Add technology as a separate metric with technology label
if [ -n "$TECH" ]; then
METRICS="$METRICS
modem_access_technology{modem_id=\"$MODEM_ID\",technology=\"$TECH\"} 1"
TECH_DISPLAY=" ($TECH)"
else
TECH_DISPLAY=""
fi
# Send to Victoria Metrics in Prometheus format
echo "$METRICS" | curl -s -X POST "$VMETRICS_URL" --data-binary @- > /dev/null 2>&1
echo "$(date '+%Y-%m-%d %H:%M:%S') - Modem $MODEM_ID: Signal quality = $SIGNAL%$TECH_DISPLAY"
else
echo "$(date '+%Y-%m-%d %H:%M:%S') - Warning: Could not read signal quality"
fi
sleep "$INTERVAL"
done
sudo chmod +x /usr/local/bin/modem-to-vmetrics.sh
sudo nano /etc/systemd/system/modem-vmetrics.service
[Unit]
Description=ModemManager to Victoria Metrics Exporter
After=ModemManager.service network.target
Requires=ModemManager.service
StartLimitIntervalSec=0
[Service]
Type=simple
ExecStart=/usr/local/bin/modem-to-vmetrics.sh
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable modem-vmetrics.service
A script to have the stick perform certain actions when a SMS is received. Four actions at the moment: lte up, lte down, reboot, and add a wifi network via “wifi <ssid> <password>” instead of having to log in and add via nmtui.
sudo nano sms_lte_control.py
#!/usr/bin/env python3
"""
SMS-based LTE Connection Controller
Polls GSM modem for SMS messages and controls NetworkManager LTE connection
"""
import subprocess
import time
import re
import logging
from datetime import datetime
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Configuration
POLL_INTERVAL = 60 # seconds
LTE_CONNECTION_NAME = "lte"
def run_command(cmd, check=True):
"""Execute a shell command and return output"""
try:
result = subprocess.run(
cmd,
shell=True,
capture_output=True,
text=True,
check=check
)
return result.stdout.strip(), result.stderr.strip(), result.returncode
except subprocess.CalledProcessError as e:
logger.error(f"Command failed: {cmd}")
logger.error(f"Error: {e.stderr}")
return None, e.stderr, e.returncode
def get_modem_path():
"""Get the ModemManager modem path"""
output, stderr, rc = run_command("mmcli -L")
if rc != 0 or not output:
return None
# Extract modem path (e.g., /org/freedesktop/ModemManager1/Modem/0)
match = re.search(r'/org/freedesktop/ModemManager1/Modem/\d+', output)
if match:
return match.group(0)
return None
def list_sms_messages(modem_path):
"""List all SMS messages on the modem"""
output, stderr, rc = run_command(f"mmcli -m {modem_path} --messaging-list-sms")
if rc != 0 or not output:
return []
# Extract SMS paths
sms_paths = re.findall(r'/org/freedesktop/ModemManager1/SMS/\d+', output)
return sms_paths
def get_sms_info(sms_path):
"""Get SMS message info including PDU type and text"""
output, stderr, rc = run_command(f"mmcli -s {sms_path} --output-keyvalue")
if rc != 0 or not output:
return None, None
# Parse key-value output
pdu_type = None
text = None
for line in output.split('\n'):
if line.startswith('sms.properties.pdu-type'):
pdu_type = line.split(':', 1)[1].strip()
elif line.startswith('sms.content.text'):
text = line.split(':', 1)[1].strip()
logger.debug(f"Extracted PDU type: {pdu_type}, Text: {text}")
return pdu_type, text
def delete_sms(modem_path, sms_path):
"""Delete an SMS message - try twice as ModemManager sometimes needs this"""
# Extract SMS index from path (e.g., /org/freedesktop/ModemManager1/SMS/1 -> 1)
sms_index = sms_path.split('/')[-1]
cmd = f"sudo mmcli -m {modem_path} --messaging-delete-sms={sms_index}"
# First attempt
output, stderr, rc = run_command(cmd, check=False)
if rc == 0:
logger.info(f"Deleted SMS: {sms_path} (index: {sms_index})")
return
logger.debug(f"First delete attempt failed: {stderr}")
# Wait 1 second before second attempt
logger.debug(f"Waiting 1 second before retry...")
time.sleep(1)
# Second attempt
output, stderr, rc = run_command(cmd, check=False)
if rc == 0:
logger.info(f"Deleted SMS on second attempt: {sms_path} (index: {sms_index})")
else:
logger.warning(f"Failed to delete SMS after two attempts: {sms_path} (index: {sms_index})")
logger.warning(f"Error: {stderr}")
def control_lte_connection(action):
"""Control LTE connection via NetworkManager"""
if action not in ["up", "down"]:
logger.error(f"Invalid action: {action}")
return False
cmd = f"sudo nmcli con {action} {LTE_CONNECTION_NAME}"
logger.info(f"Executing: {cmd}")
output, stderr, rc = run_command(cmd, check=False)
if rc == 0:
logger.info(f"LTE connection {action} successful")
return True
else:
logger.error(f"Failed to {action} LTE connection")
return False
def reboot_system():
"""Reboot the system"""
logger.warning("REBOOT command received - system will reboot in 5 seconds")
time.sleep(5) # Give time for SMS deletion and logging
run_command("sudo reboot", check=False)
def add_wifi_profile(ssid, password):
"""Add a WiFi profile to NetworkManager"""
logger.info(f"Adding WiFi profile for SSID: {ssid}")
# Create WiFi connection with NetworkManager
cmd = f"sudo nmcli dev wifi connect '{ssid}' password '{password}'"
output, stderr, rc = run_command(cmd, check=False)
if rc == 0:
logger.info(f"Successfully added WiFi profile: {ssid}")
return True
else:
logger.error(f"Failed to add WiFi profile: {ssid}")
if stderr:
logger.error(f"Error: {stderr}")
return False
def process_sms_messages(modem_path):
"""Check for and process SMS messages"""
sms_list = list_sms_messages(modem_path)
if not sms_list:
logger.debug("No SMS messages found")
return
logger.info(f"Found {len(sms_list)} SMS message(s)")
for sms_path in sms_list:
pdu_type, text = get_sms_info(sms_path)
if not pdu_type:
logger.warning(f"Could not determine PDU type for {sms_path}")
continue
logger.debug(f"SMS PDU type: {pdu_type}")
# Only process 'deliver' type messages (received messages)
if pdu_type.lower() != "deliver":
logger.info(f"Skipping non-deliver SMS (type: {pdu_type})")
continue
if text:
logger.info(f"SMS content: {text}")
text_lower = text.lower()
if "lte up" in text_lower:
logger.info("Command detected: LTE UP")
control_lte_connection("up")
elif "lte down" in text_lower:
logger.info("Command detected: LTE DOWN")
control_lte_connection("down")
elif "reboot" in text_lower:
logger.info("Command detected: REBOOT")
# Delete message before rebooting
delete_sms(modem_path, sms_path)
reboot_system()
return # Won't reach here, but for clarity
elif text_lower.startswith("wifi "):
logger.info("Command detected: WIFI")
# Parse: wifi <ssid> <password>
parts = text.split(None, 2) # Split on whitespace, max 3 parts
if len(parts) == 3:
_, ssid, password = parts
add_wifi_profile(ssid, password)
else:
logger.warning("Invalid wifi command format. Expected: wifi <ssid> <password>")
else:
logger.info("No valid command in SMS")
# Delete the processed message
delete_sms(modem_path, sms_path)
def main():
"""Main loop"""
logger.info("SMS-based LTE Controller started")
logger.info(f"Polling interval: {POLL_INTERVAL} seconds")
logger.info(f"LTE connection name: {LTE_CONNECTION_NAME}")
while True:
try:
# Get modem path
modem_path = get_modem_path()
if not modem_path:
logger.warning("No modem found, retrying...")
time.sleep(POLL_INTERVAL)
continue
logger.debug(f"Modem path: {modem_path}")
# Process SMS messages
process_sms_messages(modem_path)
except KeyboardInterrupt:
logger.info("Shutting down...")
break
except Exception as e:
logger.error(f"Unexpected error: {e}")
# Wait before next poll
time.sleep(POLL_INTERVAL)
if __name__ == "__main__":
main()
sudo chmod +x /home/user/sms_lte_control.py
sudo nano /etc/systemd/system/sms-lte-control.service
[Unit]
Description=SMS-based LTE Connection Controller
After=network.target ModemManager.service NetworkManager.service
Wants=ModemManager.service NetworkManager.service
[Service]
Type=simple
User=user
ExecStart=/usr/bin/python3 /home/user/sms_lte_control.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable sms-lte-control
Need to re-program the LEDs on the stick to indicate system power, wlan connection, and wan connection. Kernel doesn’t have any triggers for wan, so small bash script can do what we want. Green LED flashes slowly on 4G network search, quickly on obtaining an IP address, and solid when internet connectivity. Blue LED flashes when advertising a hotspot, and solid when connected to a wifi network as a client. Red LED is solid on boot (kernel default) and changes to heartbeat once rc.local loads.
sudo apt install iw
sudo nano /usr/local/bin/led-monitor.sh
#!/bin/bash
WAN_LED_PATH="/sys/devices/platform/leds/leds/blue:wan/brightness"
WLAN_LED_PATH="/sys/devices/platform/leds/leds/green:wlan/brightness"
WAN_INTERFACE="wwan0"
WLAN_INTERFACE="wlan0"
# Counters for different check intervals
wan_check_counter=0
wan_ping_counter=0
wlan_check_counter=0
wlan_blink_counter=0
wan_slow_blink_counter=0
# Cached states
wan_state=""
wlan_state=""
has_internet=0
ping_in_progress=0
echo "LED Monitor started"
echo "WAN LED Path: $WAN_LED_PATH"
echo "WLAN LED Path: $WLAN_LED_PATH"
# Ping result file in /run (always tmpfs/RAM)
PING_RESULT="/run/led_ping_result"
while true; do
# === WAN LED Logic ===
# Start ping check every 20 iterations (5 seconds) if not already in progress
if [ $wan_ping_counter -eq 0 ] && [ $ping_in_progress -eq 0 ]; then
echo "[WAN] Starting background ping to 8.8.8.8..."
ping_in_progress=1
(
if ping -c 1 -W 1 8.8.8.8 &>/dev/null; then
echo "1" > "$PING_RESULT"
else
echo "0" > "$PING_RESULT"
fi
) &
wan_ping_counter=20 # Next ping in 5 seconds
fi
# Check if background ping completed
if [ $ping_in_progress -eq 1 ] && [ -f "$PING_RESULT" ]; then
has_internet=$(cat "$PING_RESULT")
rm -f "$PING_RESULT"
ping_in_progress=0
if [ $has_internet -eq 1 ]; then
echo "[WAN] Ping successful - has internet"
else
echo "[WAN] Ping failed - no internet"
fi
fi
# Check interface state every 4 iterations (1 second)
if [ $wan_check_counter -eq 0 ]; then
if [ $has_internet -eq 1 ]; then
wan_state="solid"
echo "[WAN] State: SOLID (internet connected)"
elif ip addr show "$WAN_INTERFACE" 2>/dev/null | grep -q "inet "; then
wan_state="fast_blink"
echo "[WAN] State: FAST_BLINK (has IP, no internet)"
elif ip link show "$WAN_INTERFACE" 2>/dev/null | grep -q "UP"; then
wan_state="slow_blink"
echo "[WAN] State: SLOW_BLINK (modem connecting)"
else
wan_state="off"
echo "[WAN] State: OFF (no modem detected)"
fi
wan_check_counter=4 # Check interface every 1 second
fi
# Handle WAN LED based on state
case "$wan_state" in
solid)
echo 1 > "$WAN_LED_PATH"
;;
fast_blink)
# Toggle every 250ms (every iteration)
current_wan=$(cat $WAN_LED_PATH)
if [ "$current_wan" = "1" ]; then
echo 0 > "$WAN_LED_PATH"
else
echo 1 > "$WAN_LED_PATH"
fi
;;
slow_blink)
# Toggle every 750ms (every 3 iterations at 250ms = 750ms)
wan_slow_blink_counter=$((wan_slow_blink_counter + 1))
if [ $wan_slow_blink_counter -ge 3 ]; then
current_wan=$(cat $WAN_LED_PATH)
if [ "$current_wan" = "1" ]; then
echo 0 > "$WAN_LED_PATH"
else
echo 1 > "$WAN_LED_PATH"
fi
wan_slow_blink_counter=0
fi
;;
off)
echo 0 > "$WAN_LED_PATH"
;;
esac
# === WLAN LED Logic ===
# Check WLAN state every 4 iterations (1 second)
if [ $wlan_check_counter -eq 0 ]; then
if iw dev "$WLAN_INTERFACE" info 2>/dev/null | grep -q "type AP"; then
wlan_state="blink"
echo "[WLAN] State: BLINK (AP/hotspot mode)"
elif ip link show "$WLAN_INTERFACE" 2>/dev/null | grep -q "UP,LOWER_UP" && \
ip addr show "$WLAN_INTERFACE" 2>/dev/null | grep -q "inet "; then
wlan_state="solid"
echo "[WLAN] State: SOLID (client connected)"
else
wlan_state="off"
echo "[WLAN] State: OFF (not connected)"
fi
wlan_check_counter=4 # Check every 1 second (4 x 250ms)
fi
# Handle WLAN LED based on state
case "$wlan_state" in
blink)
# Toggle every 1 second (every 4 iterations at 250ms = 1 second)
wlan_blink_counter=$((wlan_blink_counter + 1))
if [ $wlan_blink_counter -ge 4 ]; then
current_wlan=$(cat $WLAN_LED_PATH)
if [ "$current_wlan" = "1" ]; then
echo 0 > "$WLAN_LED_PATH"
else
echo 1 > "$WLAN_LED_PATH"
fi
wlan_blink_counter=0
fi
;;
solid)
echo 1 > "$WLAN_LED_PATH"
wlan_blink_counter=0
;;
off)
echo 0 > "$WLAN_LED_PATH"
wlan_blink_counter=0
;;
esac
# Decrement counters
[ $wan_check_counter -gt 0 ] && ((wan_check_counter--))
[ $wan_ping_counter -gt 0 ] && ((wan_ping_counter--))
[ $wlan_check_counter -gt 0 ] && ((wlan_check_counter--))
# Sleep 250ms (base interval)
sleep 0.25
done
sudo chmod +x /usr/local/bin/led-monitor.sh
sudo nano /etc/systemd/system/led-monitor.service
[Unit]
Description=WAN and WLAN LED Monitor
After=network.target
[Service]
Type=simple
ExecStart=/usr/local/bin/led-monitor.sh
Restart=always
[Install]
WantedBy=multi-user.target
sudo systemctl daemon-reload
sudo systemctl enable led-monitor
Quick guide to using an existing binary on a new stick:
High-level steps:
- Pull full image binary for recovery purposes
- See above
- Pull partition binaries and put them back in after flashing
- get into fastboot mode by holding down the button and plugging in
fastboot oem reboot-edlfor n in fsc fsg modem modemst1 modemst2 persist sec; do edl r ${n} ${n}.bin done- flash the full binary of the good image
- get back into fastboot mode
for n in fsc fsg modem modemst1 modemst2 persist sec; do
fastboot flash ${n} ${n}.bin
done
fastboot reboot- Change hostname
sudo hostnamectl set-hostname stick8sudo nano /etc/hosts- Change 127.0.1.1 line to new hostname
- Delete and re-create ssh keys for user and root
sudo rm ~/.ssh/id_rsa*ssh-keygensudo surm ~/.ssh/id_rsa*ssh-keygen
- Copy ssh keys to VPS
scp ~/.ssh/id_rsa.pub pi@192.168.100.98:~/Documents/root_stick21_id_rsa.pubexitscp ~/.ssh/id_rsa.pub pi@192.168.100.98:~/Documents/user_stick21_id_rsa.pub
- Delete, recreate and copy OVPN profile
sudo rm ~/ovpns/stick5.ovpnpivpn addscp ~/ovpns/stick21.ovpn pi@192.168.100.98:~/Documents/
- Adjust port in ssh-reverse.service
sudo nano /etc/systemd/system/ssh-reverse.service
Misc commands:
scp /home/pi/Documents/stick13_files/root/* user@192.168.100.1:~/sshcopy
scp /home/pi/Documents/stick13_files/user/* user@192.168.100.1:~/.ssh
ssh user@192.168.100.1
sudo su
mv /home/user/sshcopy/* ~/.ssh
chown root:root /root/.ssh/*
for n in fsc fsg modem modemst1 modemst2 persist sec; do
edl r ${n} ${n}.bin
done
for n in fsc fsg modem modemst1 modemst2 persist sec; do
edl w ${n} ${n}.bin
done
for n in fsc fsg modem modemst1 modemst2 persist sec; do
fastboot flash ${n} ${n}.bin
done
Hey,
Great post!
Sadly I didn’t pull the image for recovery, do you think you could maybe supply me your file? I use stick version 3.0. or is the content individual and yours wouldn’t work?
Sorry, just saw this. Unfortunately, if you don’t pull the image first, you’ve lost specific data for your stick. If you know the IMEI, you can try replacing it using this command: at+wrimei=111122223333555 and at+wrimei2=111122223333555. Not sure how to do that exactly, maybe this guide will help: https://techship.com/support/faq/how-to-send-at-commands-through-modem-manager/ . But yes, if you want one of my original firmwares, I can stick it on google drive for you.
Un bon référencement pour améliorer le positionnement de vos produits
Howdy! I simply would like to give an enormous thumbs up for the nice info you’ve got right here on this post. I might be coming again to your blog for more soon.