{"id":363,"date":"2025-10-21T10:20:25","date_gmt":"2025-10-21T10:20:25","guid":{"rendered":"http:\/\/blog.ltzs.us\/?p=363"},"modified":"2025-11-08T06:12:53","modified_gmt":"2025-11-08T06:12:53","slug":"interacting-with-sms-ussd-on-debian","status":"publish","type":"post","link":"http:\/\/blog.ltzs.us\/?p=363","title":{"rendered":"Interacting with SMS USSD on Debian"},"content":{"rendered":"\n<p>On my 4G sticks, I&#8217;d like to have a way to know how much data is used each day and what the remaining balance is, and have that sent to me on a regular basis (and logged to the victoriametrics time-series database). Can use data to do that or SMS. But I need a way to send and parse multi-step USSD codes as well as send SMS messages and emails. I asked claude.ai to create a python script that does all that for me. After some debugging it came up with the following script. The .conf file has all the adjustable things like app-specific password for gmail, USSD codes specific to my carrier, and the phone number I want to text with weekly summaries. The schedule is managed by service files and timers.<\/p>\n\n\n\n<p><mark style=\"background-color:#fcb900\" class=\"has-inline-color\"><code>sudo nano \/etc\/sim-balance-monitor.conf<\/code><\/mark><\/p>\n\n\n\n<pre class=\"wp-block-prismatic-blocks\"><code class=\"language-\"># SIM Balance Monitor Configuration\n# File: \/etc\/sim-balance-monitor.conf\n\n[General]\n# Modem index (find with: mmcli -L)\nmodem_index = 0\n\n# Data storage paths\ndata_file = \/var\/log\/sim-balance\/balance-data.csv\npending_file = \/var\/log\/sim-balance\/pending-report.txt\nlog_file = \/var\/log\/sim-balance\/monitor.log\n\n[USSD]\n# Data balance USSD codes\ndata_init = #1234#\ndata_responses = 3,1\n\n# SMS balance USSD codes\nsms_init = #123#\nsms_responses = 0,3\n\n[Email]\n# Email recipient\nto = address@gmail.com\n\n# Email sender (optional, will use hostname if not specified)\nfrom = address@gmail.com\n\n# SMTP server settings\nsmtp_host = smtp.gmail.com\nsmtp_port = 587\nsmtp_user = address@gmail.com\nsmtp_password = &lt;app-specific password&gt;\n\n[SMS]\n# SMS recipient for weekly reports\nto = &lt;phone number&gt;<\/code><\/pre>\n\n\n\n<p>change permissions to protect the app-specific password<\/p>\n\n\n\n<p><mark style=\"background-color:#fcb900\" class=\"has-inline-color\"><code>sudo chmod 600 \/etc\/sim-balance-monitor.conf<\/code><\/mark><\/p>\n\n\n\n<p>The cron jobs look like<\/p>\n\n\n\n<p><mark style=\"background-color:#fcb900\" class=\"has-inline-color\"><code>sudo crontab -e<\/code><\/mark><\/p>\n\n\n\n<pre class=\"wp-block-prismatic-blocks\"><code class=\"language-\">0 9 * * * \/usr\/local\/bin\/check_data_balance.sh daily\n0 10 * * 1 \/usr\/local\/bin\/check_data_balance.sh weekly<\/code><\/pre>\n\n\n\n<p>Script itself is here:<\/p>\n\n\n\n<pre class=\"wp-block-prismatic-blocks\"><code class=\"language-python\">#!\/usr\/bin\/env python3\n&quot;&quot;&quot;\nSIM Balance Monitor\nChecks SIM card data balance daily and sends weekly email\/SMS reports\n&quot;&quot;&quot;\n\nimport subprocess\nimport re\nimport csv\nimport time\nimport smtplib\nimport socket\nimport os\nfrom datetime import datetime, timedelta\nfrom pathlib import Path\nfrom email.mime.text import MIMEText\nfrom email.mime.multipart import MIMEMultipart\nimport sys\nimport argparse\nimport logging\nimport configparser\n\n# Default configuration file path\nDEFAULT_CONFIG_FILE = &#039;\/etc\/sim-balance-monitor.conf&#039;\n\n# Default values if not specified in config\nDEFAULT_CONFIG = {\n    &#039;modem_index&#039;: &#039;0&#039;,\n    &#039;data_file&#039;: &#039;\/var\/log\/sim-balance\/balance-data.csv&#039;,\n    &#039;pending_file&#039;: &#039;\/var\/log\/sim-balance\/pending-report.txt&#039;,\n    &#039;log_file&#039;: &#039;\/var\/log\/sim-balance\/monitor.log&#039;,\n    &#039;ussd_init&#039;: &#039;#1234#&#039;,\n    &#039;ussd_responses&#039;: &#039;3,1&#039;,\n    &#039;sms_ussd_init&#039;: &#039;#123#&#039;,\n    &#039;sms_ussd_responses&#039;: &#039;0,3&#039;,\n}\n\ndef load_config(config_file=DEFAULT_CONFIG_FILE):\n    &quot;&quot;&quot;Load configuration from file&quot;&quot;&quot;\n    config = configparser.ConfigParser()\n    \n    if not Path(config_file).exists():\n        print(f&quot;ERROR: Configuration file not found: {config_file}&quot;, file=sys.stderr)\n        print(f&quot;Please create the configuration file at {config_file}&quot;, file=sys.stderr)\n        print(f&quot;See the installation guide for the configuration template.&quot;, file=sys.stderr)\n        sys.exit(1)\n    \n    try:\n        config.read(config_file)\n    except Exception as e:\n        print(f&quot;ERROR: Failed to parse configuration file: {e}&quot;, file=sys.stderr)\n        sys.exit(1)\n    \n    # Build configuration dictionary\n    cfg = {}\n    \n    # General settings\n    cfg[&#039;modem_index&#039;] = int(config.get(&#039;General&#039;, &#039;modem_index&#039;, fallback=DEFAULT_CONFIG[&#039;modem_index&#039;]))\n    cfg[&#039;data_file&#039;] = config.get(&#039;General&#039;, &#039;data_file&#039;, fallback=DEFAULT_CONFIG[&#039;data_file&#039;])\n    cfg[&#039;pending_file&#039;] = config.get(&#039;General&#039;, &#039;pending_file&#039;, fallback=DEFAULT_CONFIG[&#039;pending_file&#039;])\n    cfg[&#039;log_file&#039;] = config.get(&#039;General&#039;, &#039;log_file&#039;, fallback=DEFAULT_CONFIG[&#039;log_file&#039;])\n    \n    # USSD codes\n    cfg[&#039;ussd_init&#039;] = config.get(&#039;USSD&#039;, &#039;data_init&#039;, fallback=DEFAULT_CONFIG[&#039;ussd_init&#039;])\n    cfg[&#039;ussd_responses&#039;] = config.get(&#039;USSD&#039;, &#039;data_responses&#039;, fallback=DEFAULT_CONFIG[&#039;ussd_responses&#039;]).split(&#039;,&#039;)\n    cfg[&#039;sms_ussd_init&#039;] = config.get(&#039;USSD&#039;, &#039;sms_init&#039;, fallback=DEFAULT_CONFIG[&#039;sms_ussd_init&#039;])\n    cfg[&#039;sms_ussd_responses&#039;] = config.get(&#039;USSD&#039;, &#039;sms_responses&#039;, fallback=DEFAULT_CONFIG[&#039;sms_ussd_responses&#039;]).split(&#039;,&#039;)\n    \n    # Email settings\n    cfg[&#039;email_to&#039;] = config.get(&#039;Email&#039;, &#039;to&#039;, fallback=&#039;&#039;)\n    cfg[&#039;email_from&#039;] = config.get(&#039;Email&#039;, &#039;from&#039;, fallback=f&#039;sim-monitor@{socket.gethostname()}&#039;)\n    cfg[&#039;smtp_host&#039;] = config.get(&#039;Email&#039;, &#039;smtp_host&#039;, fallback=&#039;smtp.gmail.com&#039;)\n    cfg[&#039;smtp_port&#039;] = int(config.get(&#039;Email&#039;, &#039;smtp_port&#039;, fallback=&#039;587&#039;))\n    cfg[&#039;smtp_user&#039;] = config.get(&#039;Email&#039;, &#039;smtp_user&#039;, fallback=&#039;&#039;)\n    cfg[&#039;smtp_password&#039;] = config.get(&#039;Email&#039;, &#039;smtp_password&#039;, fallback=&#039;&#039;)\n    \n    # SMS settings\n    cfg[&#039;sms_to&#039;] = config.get(&#039;SMS&#039;, &#039;to&#039;, fallback=&#039;&#039;)\n    \n    return cfg\n\n# Global config variable\nCONFIG = None\n\n# Setup logging\ndef setup_logging():\n    log_dir = Path(CONFIG[&#039;log_file&#039;]).parent\n    log_dir.mkdir(parents=True, exist_ok=True)\n    \n    # Set log level based on environment variable\n    log_level = logging.DEBUG if os.getenv(&#039;DEBUG&#039;) else logging.INFO\n    \n    logging.basicConfig(\n        level=log_level,\n        format=&#039;[%(asctime)s] %(levelname)s: %(message)s&#039;,\n        datefmt=&#039;%Y-%m-%d %H:%M:%S&#039;,\n        handlers=[\n            logging.FileHandler(CONFIG[&#039;log_file&#039;]),\n            logging.StreamHandler()\n        ]\n    )\n\nclass USSDSession:\n    &quot;&quot;&quot;Manages USSD communication with the modem&quot;&quot;&quot;\n    \n    def __init__(self, modem_index):\n        self.modem_index = modem_index\n    \n    def _run_mmcli(self, args, timeout=30):\n        &quot;&quot;&quot;Run mmcli command and return output&quot;&quot;&quot;\n        cmd = [&#039;mmcli&#039;, &#039;-m&#039;, str(self.modem_index)] + args\n        try:\n            result = subprocess.run(\n                cmd,\n                capture_output=True,\n                text=True,\n                timeout=timeout\n            )\n            return result.stdout, result.stderr, result.returncode\n        except subprocess.TimeoutExpired:\n            logging.error(f&quot;Command timeout: {&#039; &#039;.join(cmd)}&quot;)\n            return None, &quot;Timeout&quot;, 1\n    \n    def cancel(self):\n        &quot;&quot;&quot;Cancel any active USSD session&quot;&quot;&quot;\n        self._run_mmcli([&#039;--3gpp-ussd-cancel&#039;])\n    \n    def initiate(self, code):\n        &quot;&quot;&quot;Initiate USSD session&quot;&quot;&quot;\n        logging.info(f&quot;Initiating USSD: {code}&quot;)\n        stdout, stderr, rc = self._run_mmcli([&#039;--3gpp-ussd-initiate&#039;, code])\n        \n        # Log full output for debugging\n        if stdout:\n            logging.debug(f&quot;STDOUT: {stdout}&quot;)\n        if stderr:\n            logging.debug(f&quot;STDERR: {stderr}&quot;)\n        \n        if rc != 0:\n            logging.error(f&quot;USSD initiate failed: {stderr}&quot;)\n            return None\n        \n        response = self._extract_response(stdout)\n        if not response:\n            # Sometimes response comes later, try checking status\n            logging.info(&quot;No immediate response, checking USSD status...&quot;)\n            time.sleep(2)\n            stdout, stderr, rc = self._run_mmcli([&#039;--3gpp-ussd-status&#039;])\n            if stdout:\n                logging.debug(f&quot;Status output: {stdout}&quot;)\n                response = self._extract_response(stdout)\n        \n        return response\n    \n    def respond(self, response):\n        &quot;&quot;&quot;Respond to USSD prompt&quot;&quot;&quot;\n        logging.info(f&quot;Responding: {response}&quot;)\n        stdout, stderr, rc = self._run_mmcli([&#039;--3gpp-ussd-respond&#039;, response])\n        \n        # Log full output for debugging\n        if stdout:\n            logging.debug(f&quot;STDOUT: {stdout}&quot;)\n        if stderr:\n            logging.debug(f&quot;STDERR: {stderr}&quot;)\n        \n        if rc != 0:\n            logging.error(f&quot;USSD respond failed: {stderr}&quot;)\n            return None\n        \n        result = self._extract_response(stdout)\n        if not result:\n            # Try checking status\n            logging.info(&quot;No immediate response, checking USSD status...&quot;)\n            time.sleep(2)\n            stdout, stderr, rc = self._run_mmcli([&#039;--3gpp-ussd-status&#039;])\n            if stdout:\n                logging.debug(f&quot;Status output: {stdout}&quot;)\n                result = self._extract_response(stdout)\n        \n        return result\n    \n    def _extract_response(self, output):\n        &quot;&quot;&quot;Extract response text from mmcli output&quot;&quot;&quot;\n        if not output:\n            logging.warning(&quot;No output from mmcli command&quot;)\n            return None\n        \n        # Log the raw output for debugging\n        logging.debug(f&quot;Raw mmcli output: {output}&quot;)\n        \n        # First try to extract from &#039;network request:&#039; (multi-line format from --3gpp-ussd-status)\n        if &#039;network request:&#039; in output:\n            lines = output.split(&#039;\\n&#039;)\n            response_lines = []\n            capture = False\n            \n            for line in lines:\n                if &#039;network request:&#039; in line:\n                    # Start capturing from this line\n                    capture = True\n                    # Get the text after &quot;network request:&quot;\n                    parts = line.split(&#039;network request:&#039;, 1)\n                    if len(parts) &gt; 1 and parts[1].strip():\n                        response_lines.append(parts[1].strip())\n                elif capture:\n                    # Continue capturing indented lines\n                    if line.strip() and (line.startswith(&#039;            |&#039;) or line.startswith(&#039;           |&#039;)):\n                        # Remove the leading pipe and whitespace\n                        text = line.split(&#039;|&#039;, 1)\n                        if len(text) &gt; 1:\n                            response_lines.append(text[1].strip())\n                    elif &#039;network notification:&#039; in line or not line.strip().startswith(&#039;|&#039;):\n                        # Stop capturing when we hit another field or unindented line\n                        break\n            \n            if response_lines:\n                response = &#039; &#039;.join(response_lines)\n                logging.info(f&quot;Extracted response from &#039;network request&#039;: {response[:150]}...&quot;)\n                return response\n        \n        # Try to extract from &#039;new reply from network:&#039; (single-line format from initiate\/respond)\n        match = re.search(r&quot;new reply from network: &#039;(.+?)&#039;&quot;, output, re.MULTILINE | re.DOTALL)\n        if match:\n            response = match.group(1).strip()\n            # Clean up the response (remove extra whitespace, newlines)\n            response = &#039; &#039;.join(response.split())\n            logging.info(f&quot;Extracted response from &#039;new reply&#039;: {response[:150]}...&quot;)\n            return response\n        \n        # Try other legacy patterns\n        patterns = [\n            r&quot;response: &#039;(.+?)&#039;&quot;,\n            r&#039;response: &quot;(.+?)&quot;&#039;,\n        ]\n        \n        for pattern in patterns:\n            match = re.search(pattern, output, re.MULTILINE | re.DOTALL)\n            if match:\n                response = match.group(1).strip()\n                response = &#039; &#039;.join(response.split())\n                logging.info(f&quot;Extracted response: {response[:150]}...&quot;)\n                return response\n        \n        # If no pattern matches, log the full output\n        logging.warning(f&quot;Could not extract response from output: {output[:300]}&quot;)\n        return None\n\nclass BalanceChecker:\n    &quot;&quot;&quot;Handles balance checking and data storage&quot;&quot;&quot;\n    \n    def __init__(self):\n        self.data_file = Path(CONFIG[&#039;data_file&#039;])\n        self.data_file.parent.mkdir(parents=True, exist_ok=True)\n        self.ussd = USSDSession(CONFIG[&#039;modem_index&#039;])\n    \n    def check_data_balance(self):\n        &quot;&quot;&quot;Perform USSD sequence to check data balance&quot;&quot;&quot;\n        logging.info(&quot;Starting data balance check&quot;)\n        \n        # Cancel any existing session\n        self.ussd.cancel()\n        time.sleep(2)\n        \n        # Initial USSD\n        response = self.ussd.initiate(CONFIG[&#039;ussd_init&#039;])\n        if not response:\n            logging.error(&quot;No response from initial USSD&quot;)\n            return None\n        logging.info(f&quot;Data Response 1: {response}&quot;)\n        \n        # Follow-up responses\n        for i, resp_code in enumerate(CONFIG[&#039;ussd_responses&#039;], 2):\n            time.sleep(3)\n            response = self.ussd.respond(resp_code)\n            if not response:\n                logging.error(f&quot;No response from data step {i}&quot;)\n                self.ussd.cancel()\n                return None\n            logging.info(f&quot;Data Response {i}: {response}&quot;)\n        \n        self.ussd.cancel()\n        return response\n    \n    def check_sms_balance(self):\n        &quot;&quot;&quot;Perform USSD sequence to check SMS balance&quot;&quot;&quot;\n        logging.info(&quot;Starting SMS balance check&quot;)\n        \n        # Cancel any existing session\n        self.ussd.cancel()\n        time.sleep(2)\n        \n        # Initial USSD\n        response = self.ussd.initiate(CONFIG[&#039;sms_ussd_init&#039;])\n        if not response:\n            logging.error(&quot;No response from initial SMS USSD&quot;)\n            return None\n        logging.info(f&quot;SMS Response 1: {response}&quot;)\n        \n        # Follow-up responses\n        for i, resp_code in enumerate(CONFIG[&#039;sms_ussd_responses&#039;], 2):\n            time.sleep(3)\n            response = self.ussd.respond(resp_code)\n            if not response:\n                logging.error(f&quot;No response from SMS step {i}&quot;)\n                self.ussd.cancel()\n                return None\n            logging.info(f&quot;SMS Response {i}: {response}&quot;)\n        \n        self.ussd.cancel()\n        return response\n    \n    def parse_data_balance(self, response):\n        &quot;&quot;&quot;Parse data balance and expiry from response&quot;&quot;&quot;\n        # Pattern: &quot;6676 Mo valables jusqu&#039;au 10\/11\/2025 \u00e0 17:51:29&quot;\n        pattern = r&#039;(\\d+)\\s*Mo\\s*valables\\s*jusqu\\&#039;au\\s*(\\d{2}\/\\d{2}\/\\d{4})\\s*\u00e0\\s*(\\d{2}:\\d{2}:\\d{2})&#039;\n        match = re.search(pattern, response)\n        \n        if match:\n            balance_mb = int(match.group(1))\n            balance_gb = round(balance_mb \/ 1024, 2)\n            expiry_date = match.group(2)\n            expiry_time = match.group(3)\n            \n            logging.info(f&quot;Data Balance: {balance_mb} MB ({balance_gb} GB), Expires: {expiry_date} {expiry_time}&quot;)\n            return {\n                &#039;balance_mb&#039;: balance_mb,\n                &#039;balance_gb&#039;: balance_gb,\n                &#039;expiry_date&#039;: expiry_date,\n                &#039;expiry_time&#039;: expiry_time\n            }\n        else:\n            logging.error(f&quot;Could not parse data balance from: {response}&quot;)\n            return None\n    \n    def parse_sms_balance(self, response):\n        &quot;&quot;&quot;Parse SMS balance and expiry from response&quot;&quot;&quot;\n        # Pattern: &quot;935 SMS, valable jusqu&#039;au 10\/11\/2025 17:51:29&quot;\n        pattern = r&#039;(\\d+)\\s*SMS.*?valable\\s*jusqu\\&#039;au\\s*(\\d{2}\/\\d{2}\/\\d{4})\\s*(\\d{2}:\\d{2}:\\d{2})&#039;\n        match = re.search(pattern, response)\n        \n        if match:\n            sms_count = int(match.group(1))\n            expiry_date = match.group(2)\n            expiry_time = match.group(3)\n            \n            logging.info(f&quot;SMS Balance: {sms_count} SMS, Expires: {expiry_date} {expiry_time}&quot;)\n            return {\n                &#039;sms_count&#039;: sms_count,\n                &#039;expiry_date&#039;: expiry_date,\n                &#039;expiry_time&#039;: expiry_time\n            }\n        else:\n            logging.error(f&quot;Could not parse SMS balance from: {response}&quot;)\n            return None\n    \n    def save_balance(self, data_info, sms_info):\n        &quot;&quot;&quot;Save balance data to CSV&quot;&quot;&quot;\n        timestamp = datetime.now().strftime(&#039;%Y-%m-%d %H:%M:%S&#039;)\n        \n        with open(self.data_file, &#039;a&#039;, newline=&#039;&#039;) as f:\n            writer = csv.writer(f)\n            if data_info and sms_info:\n                writer.writerow([\n                    timestamp,\n                    data_info[&#039;balance_mb&#039;],\n                    data_info[&#039;balance_gb&#039;],\n                    data_info[&#039;expiry_date&#039;],\n                    data_info[&#039;expiry_time&#039;],\n                    sms_info[&#039;sms_count&#039;],\n                    sms_info[&#039;expiry_date&#039;],\n                    sms_info[&#039;expiry_time&#039;]\n                ])\n            elif data_info:\n                writer.writerow([\n                    timestamp,\n                    data_info[&#039;balance_mb&#039;],\n                    data_info[&#039;balance_gb&#039;],\n                    data_info[&#039;expiry_date&#039;],\n                    data_info[&#039;expiry_time&#039;],\n                    &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;\n                ])\n            elif sms_info:\n                writer.writerow([\n                    timestamp,\n                    &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;,\n                    sms_info[&#039;sms_count&#039;],\n                    sms_info[&#039;expiry_date&#039;],\n                    sms_info[&#039;expiry_time&#039;]\n                ])\n            else:\n                writer.writerow([timestamp, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;, &#039;ERROR&#039;])\n    \n    def run(self):\n        &quot;&quot;&quot;Run complete balance check&quot;&quot;&quot;\n        data_response = self.check_data_balance()\n        data_info = self.parse_data_balance(data_response) if data_response else None\n        \n        # Wait a bit between checks\n        time.sleep(5)\n        \n        sms_response = self.check_sms_balance()\n        sms_info = self.parse_sms_balance(sms_response) if sms_response else None\n        \n        self.save_balance(data_info, sms_info)\n        return data_info is not None or sms_info is not None\n\nclass ReportGenerator:\n    &quot;&quot;&quot;Generates reports from balance data&quot;&quot;&quot;\n    \n    def __init__(self):\n        self.data_file = Path(CONFIG[&#039;data_file&#039;])\n    \n    def get_weekly_data(self):\n        &quot;&quot;&quot;Get balance data from last 7 days&quot;&quot;&quot;\n        if not self.data_file.exists():\n            return []\n        \n        cutoff = datetime.now() - timedelta(days=7)\n        cutoff_str = cutoff.strftime(&#039;%Y-%m-%d&#039;)\n        \n        data = []\n        with open(self.data_file, &#039;r&#039;) as f:\n            reader = csv.reader(f)\n            for row in reader:\n                if row and row[0] &gt;= cutoff_str:\n                    data.append(row)\n        \n        return data\n    \n    def calculate_daily_usage(self, data):\n        &quot;&quot;&quot;Calculate average daily data usage&quot;&quot;&quot;\n        prev_balance = None\n        total_usage = 0\n        usage_days = 0\n        \n        for row in data:\n            if len(row) &gt; 1 and row[1] not in [&#039;ERROR&#039;, &#039;N\/A&#039;]:\n                balance = int(row[1])\n                if prev_balance is not None:\n                    daily_usage = prev_balance - balance\n                    if daily_usage &gt;= 0:\n                        total_usage += daily_usage\n                        usage_days += 1\n                prev_balance = balance\n        \n        return total_usage \/\/ usage_days if usage_days &gt; 0 else 0\n    \n    def calculate_daily_sms_usage(self, data):\n        &quot;&quot;&quot;Calculate average daily SMS usage&quot;&quot;&quot;\n        prev_balance = None\n        total_usage = 0\n        usage_days = 0\n        \n        for row in data:\n            if len(row) &gt; 5 and row[5] not in [&#039;ERROR&#039;, &#039;N\/A&#039;]:\n                balance = int(row[5])\n                if prev_balance is not None:\n                    daily_usage = prev_balance - balance\n                    if daily_usage &gt;= 0:\n                        total_usage += daily_usage\n                        usage_days += 1\n                prev_balance = balance\n        \n        return total_usage \/\/ usage_days if usage_days &gt; 0 else 0\n    \n    def generate_email_report(self, data):\n        &quot;&quot;&quot;Generate detailed email report&quot;&quot;&quot;\n        report = &quot;SIM Card Data Balance Report\\n&quot;\n        report += &quot;=&quot; * 80 + &quot;\\n\\n&quot;\n        report += &quot;Report Period: Last 7 Days\\n&quot;\n        report += f&quot;Generated: {datetime.now().strftime(&#039;%Y-%m-%d %H:%M:%S&#039;)}\\n\\n&quot;\n        \n        if not data:\n            report += &quot;No data collected during this period.\\n&quot;\n            return report\n        \n        report += &quot;Daily Balance History:\\n&quot;\n        report += &quot;-&quot; * 80 + &quot;\\n&quot;\n        report += f&quot;{&#039;Date&#039;:&lt;19} | {&#039;Data Balance&#039;:&lt;16} | {&#039;SMS Balance&#039;:&lt;12} | {&#039;Expiry&#039;}\\n&quot;\n        report += &quot;-&quot; * 80 + &quot;\\n&quot;\n        \n        for row in data:\n            if len(row) &gt;= 8:\n                timestamp, balance_mb, balance_gb, data_expiry_date, data_expiry_time, sms_count, sms_expiry_date, sms_expiry_time = row[:8]\n            else:\n                # Old format without SMS data\n                timestamp = row[0] if len(row) &gt; 0 else &#039;ERROR&#039;\n                balance_mb = row[1] if len(row) &gt; 1 else &#039;ERROR&#039;\n                balance_gb = row[2] if len(row) &gt; 2 else &#039;ERROR&#039;\n                data_expiry_date = row[3] if len(row) &gt; 3 else &#039;ERROR&#039;\n                data_expiry_time = row[4] if len(row) &gt; 4 else &#039;ERROR&#039;\n                sms_count = &#039;N\/A&#039;\n                sms_expiry_date = &#039;N\/A&#039;\n                sms_expiry_time = &#039;&#039;\n            \n            if balance_mb == &#039;ERROR&#039; and sms_count == &#039;ERROR&#039;:\n                report += f&quot;{timestamp:&lt;19} | {&#039;ERROR&#039;:&lt;16} | {&#039;ERROR&#039;:&lt;12} | Check logs\\n&quot;\n            else:\n                data_str = f&quot;{balance_mb} MB ({balance_gb}GB)&quot; if balance_mb != &#039;ERROR&#039; else &#039;ERROR&#039;\n                sms_str = f&quot;{sms_count} SMS&quot; if sms_count not in [&#039;ERROR&#039;, &#039;N\/A&#039;] else sms_count\n                expiry_str = f&quot;{data_expiry_date} {data_expiry_time}&quot; if data_expiry_date != &#039;ERROR&#039; else &#039;ERROR&#039;\n                report += f&quot;{timestamp:&lt;19} | {data_str:&lt;16} | {sms_str:&lt;12} | {expiry_str}\\n&quot;\n        \n        report += &quot;-&quot; * 80 + &quot;\\n\\n&quot;\n        \n        # Statistics\n        valid_data_balances = [int(row[1]) for row in data if len(row) &gt; 1 and row[1] not in [&#039;ERROR&#039;, &#039;N\/A&#039;]]\n        valid_sms_balances = [int(row[5]) for row in data if len(row) &gt; 5 and row[5] not in [&#039;ERROR&#039;, &#039;N\/A&#039;]]\n        \n        if valid_data_balances or valid_sms_balances:\n            report += &quot;Summary Statistics:\\n&quot;\n            \n            if valid_data_balances:\n                avg_data = sum(valid_data_balances) \/\/ len(valid_data_balances)\n                min_data = min(valid_data_balances)\n                max_data = max(valid_data_balances)\n                avg_usage = self.calculate_daily_usage(data)\n                \n                report += f&quot;  Data - Average Balance: {avg_data} MB\\n&quot;\n                report += f&quot;  Data - Minimum Balance: {min_data} MB\\n&quot;\n                report += f&quot;  Data - Maximum Balance: {max_data} MB\\n&quot;\n                report += f&quot;  Data - Average Daily Usage: {avg_usage} MB\/day\\n&quot;\n            \n            if valid_sms_balances:\n                avg_sms = sum(valid_sms_balances) \/\/ len(valid_sms_balances)\n                min_sms = min(valid_sms_balances)\n                max_sms = max(valid_sms_balances)\n                avg_sms_usage = self.calculate_daily_sms_usage(data)\n                \n                report += f&quot;  SMS - Average Balance: {avg_sms} SMS\\n&quot;\n                report += f&quot;  SMS - Minimum Balance: {min_sms} SMS\\n&quot;\n                report += f&quot;  SMS - Maximum Balance: {max_sms} SMS\\n&quot;\n                report += f&quot;  SMS - Average Daily Usage: {avg_sms_usage} SMS\/day\\n&quot;\n        \n        return report\n    \n    def generate_sms_report(self, data):\n        &quot;&quot;&quot;Generate compact SMS report&quot;&quot;&quot;\n        if not data:\n            return None\n        \n        sms = f&quot;SIM Wk {datetime.now().strftime(&#039;%m\/%d&#039;)}:&quot;\n        \n        last_expiry = None\n        last_sms_count = None\n        for row in data:\n            if len(row) &gt;= 8:\n                timestamp, balance_mb, _, expiry_date, _, sms_count, _, _ = row[:8]\n            else:\n                # Old format\n                timestamp = row[0] if len(row) &gt; 0 else None\n                balance_mb = row[1] if len(row) &gt; 1 else &#039;ERROR&#039;\n                expiry_date = row[3] if len(row) &gt; 3 else None\n                sms_count = &#039;N\/A&#039;\n            \n            if timestamp and balance_mb not in [&#039;ERROR&#039;, &#039;N\/A&#039;]:\n                day = timestamp.split()[0].split(&#039;-&#039;)[2]\n                sms += f&quot; {day}:{balance_mb}M&quot;\n                last_expiry = expiry_date\n                if sms_count not in [&#039;ERROR&#039;, &#039;N\/A&#039;]:\n                    last_sms_count = sms_count\n        \n        # Add data statistics\n        avg_usage = self.calculate_daily_usage(data)\n        valid_balances = [int(row[1]) for row in data if len(row) &gt; 1 and row[1] not in [&#039;ERROR&#039;, &#039;N\/A&#039;]]\n        min_balance = min(valid_balances) if valid_balances else 0\n        \n        sms += f&quot; Avg:{avg_usage}M\/d Min:{min_balance}M&quot;\n        \n        # Add SMS info if available\n        if last_sms_count:\n            sms += f&quot; SMS:{last_sms_count}&quot;\n        \n        # Add expiry\n        if last_expiry:\n            expiry_compact = &#039;\/&#039;.join(last_expiry.split(&#039;\/&#039;)[:2])\n            sms += f&quot; Exp:{expiry_compact}&quot;\n        \n        return sms\n\nclass EmailSender:\n    &quot;&quot;&quot;Handles email sending via SMTP&quot;&quot;&quot;\n    \n    def check_network(self):\n        &quot;&quot;&quot;Check if network is available&quot;&quot;&quot;\n        for host in [&#039;8.8.8.8&#039;, &#039;1.1.1.1&#039;]:\n            try:\n                socket.create_connection((host, 53), timeout=5)\n                return True\n            except OSError:\n                continue\n        return False\n    \n    def send_email(self, subject, body):\n        &quot;&quot;&quot;Send email using SMTP&quot;&quot;&quot;\n        if not self.check_network():\n            logging.warning(&quot;No network connectivity&quot;)\n            return False\n        \n        try:\n            msg = MIMEMultipart()\n            msg[&#039;From&#039;] = CONFIG[&#039;email_from&#039;]\n            msg[&#039;To&#039;] = CONFIG[&#039;email_to&#039;]\n            msg[&#039;Subject&#039;] = subject\n            msg.attach(MIMEText(body, &#039;plain&#039;))\n            \n            with smtplib.SMTP(CONFIG[&#039;smtp_host&#039;], CONFIG[&#039;smtp_port&#039;], timeout=30) as server:\n                server.starttls()\n                server.login(CONFIG[&#039;smtp_user&#039;], CONFIG[&#039;smtp_password&#039;])\n                server.send_message(msg)\n            \n            logging.info(&quot;Email sent successfully&quot;)\n            return True\n        except Exception as e:\n            logging.error(f&quot;Failed to send email: {e}&quot;)\n            return False\n\nclass SMSSender:\n    &quot;&quot;&quot;Handles SMS sending via mmcli&quot;&quot;&quot;\n    \n    def __init__(self):\n        self.modem_index = CONFIG[&#039;modem_index&#039;]\n    \n    def send_sms(self, message):\n        &quot;&quot;&quot;Send SMS using mmcli&quot;&quot;&quot;\n        logging.info(f&quot;Sending SMS ({len(message)} chars): {message}&quot;)\n        \n        # Step 1: Create the SMS\n        # Note: Use proper quoting for text with spaces\n        create_cmd = [\n            &#039;mmcli&#039;, &#039;-m&#039;, str(self.modem_index),\n            &#039;--messaging-create-sms&#039;,\n            f&#039;text=&quot;{message}&quot;,number=&quot;{CONFIG[&quot;sms_to&quot;]}&quot;&#039;\n        ]\n        \n        try:\n            result = subprocess.run(\n                create_cmd,\n                capture_output=True,\n                text=True,\n                timeout=30\n            )\n            \n            if result.returncode != 0:\n                logging.error(f&quot;Failed to create SMS: {result.stderr}&quot;)\n                return False\n            \n            # Extract SMS index from output\n            # Output looks like: &quot;Successfully created new SMS: \/org\/freedesktop\/ModemManager1\/SMS\/0&quot;\n            match = re.search(r&#039;\/SMS\/(\\d+)&#039;, result.stdout)\n            if not match:\n                logging.error(f&quot;Could not extract SMS index from output: {result.stdout}&quot;)\n                return False\n            \n            sms_index = match.group(1)\n            logging.info(f&quot;Created SMS with index: {sms_index}&quot;)\n            \n            # Step 2: Send the SMS\n            send_cmd = [\n                &#039;mmcli&#039;, &#039;-s&#039;, sms_index,\n                &#039;--send&#039;\n            ]\n            \n            result = subprocess.run(\n                send_cmd,\n                capture_output=True,\n                text=True,\n                timeout=30\n            )\n            \n            if result.returncode == 0:\n                logging.info(f&quot;SMS sent successfully to {CONFIG[&#039;sms_to&#039;]}&quot;)\n                return True\n            else:\n                logging.error(f&quot;Failed to send SMS: {result.stderr}&quot;)\n                return False\n                \n        except Exception as e:\n            logging.error(f&quot;SMS send error: {e}&quot;)\n            return False\n\nclass WeeklyReporter:\n    &quot;&quot;&quot;Manages weekly email and SMS reports&quot;&quot;&quot;\n    \n    def __init__(self):\n        self.report_gen = ReportGenerator()\n        self.email_sender = EmailSender()\n        self.sms_sender = SMSSender()\n        self.pending_file = Path(CONFIG[&#039;pending_file&#039;])\n    \n    def send_reports(self):\n        &quot;&quot;&quot;Send both email and SMS reports&quot;&quot;&quot;\n        logging.info(&quot;Starting weekly reports&quot;)\n        \n        # Get current week data\n        current_data = self.report_gen.get_weekly_data()\n        \n        # Load pending data if exists\n        pending_data = []\n        if self.pending_file.exists():\n            logging.info(&quot;Found pending data from previous week(s)&quot;)\n            with open(self.pending_file, &#039;r&#039;) as f:\n                reader = csv.reader(f)\n                pending_data = list(reader)\n        \n        # Combine data\n        all_data = pending_data + current_data\n        \n        if not all_data:\n            logging.info(&quot;No data to report&quot;)\n            return\n        \n        # Generate reports\n        email_body = self.report_gen.generate_email_report(all_data)\n        sms_text = self.report_gen.generate_sms_report(current_data)\n        \n        # Send email\n        subject = f&quot;Weekly SIM Balance Report - {datetime.now().strftime(&#039;%Y-%m-%d&#039;)}&quot;\n        if self.email_sender.send_email(subject, email_body):\n            # Clear pending data on success\n            if self.pending_file.exists():\n                self.pending_file.unlink()\n            \n            # Archive old data (keep 60 days)\n            self.archive_old_data()\n        else:\n            # Save for next week\n            logging.info(&quot;Saving data for next week&quot;)\n            with open(self.pending_file, &#039;w&#039;, newline=&#039;&#039;) as f:\n                writer = csv.writer(f)\n                writer.writerows(all_data)\n        \n        # Send SMS (fire-and-forget)\n        if sms_text:\n            self.sms_sender.send_sms(sms_text)\n    \n    def archive_old_data(self):\n        &quot;&quot;&quot;Keep only last 60 days of data&quot;&quot;&quot;\n        data_file = Path(CONFIG[&#039;data_file&#039;])\n        if not data_file.exists():\n            return\n        \n        cutoff = datetime.now() - timedelta(days=60)\n        cutoff_str = cutoff.strftime(&#039;%Y-%m-%d&#039;)\n        \n        temp_file = data_file.with_suffix(&#039;.tmp&#039;)\n        with open(data_file, &#039;r&#039;) as fin, open(temp_file, &#039;w&#039;, newline=&#039;&#039;) as fout:\n            reader = csv.reader(fin)\n            writer = csv.writer(fout)\n            for row in reader:\n                if row and row[0] &gt;= cutoff_str:\n                    writer.writerow(row)\n        \n        temp_file.replace(data_file)\n        logging.info(&quot;Archived old data&quot;)\n\ndef main():\n    global CONFIG\n    \n    parser = argparse.ArgumentParser(description=&#039;SIM Balance Monitor&#039;)\n    parser.add_argument(&#039;action&#039;, choices=[&#039;check&#039;, &#039;report&#039;], \n                       help=&#039;Action to perform: check (daily balance) or report (weekly summary)&#039;)\n    parser.add_argument(&#039;-c&#039;, &#039;--config&#039;, default=DEFAULT_CONFIG_FILE,\n                       help=f&#039;Configuration file path (default: {DEFAULT_CONFIG_FILE})&#039;)\n    args = parser.parse_args()\n    \n    # Load configuration FIRST\n    CONFIG = load_config(args.config)\n    \n    # THEN setup logging (needs CONFIG to be loaded)\n    setup_logging()\n    \n    try:\n        if args.action == &#039;check&#039;:\n            checker = BalanceChecker()\n            success = checker.run()\n            sys.exit(0 if success else 1)\n        \n        elif args.action == &#039;report&#039;:\n            reporter = WeeklyReporter()\n            reporter.send_reports()\n            sys.exit(0)\n    \n    except Exception as e:\n        logging.error(f&quot;Unexpected error: {e}&quot;, exc_info=True)\n        sys.exit(1)\n\nif __name__ == &#039;__main__&#039;:\n    main()\n<\/code><\/pre>\n\n\n\n<p>Install at \/usr\/local\/bin\/ and change permissions to allow execution.<\/p>\n","protected":false},"excerpt":{"rendered":"<p>On my 4G sticks, I&#8217;d like to have a way to know how much data is used each day and what the remaining balance is, and have that sent to me on a regular basis (and logged to the victoriametrics time-series database). Can use data to do that or SMS. But I need a way&#8230;<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[1],"tags":[],"class_list":["post-363","post","type-post","status-publish","format-standard","hentry","category-uncategorized"],"_links":{"self":[{"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/posts\/363","targetHints":{"allow":["GET"]}}],"collection":[{"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/posts"}],"about":[{"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=%2Fwp%2Fv2%2Fcomments&post=363"}],"version-history":[{"count":5,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/posts\/363\/revisions"}],"predecessor-version":[{"id":377,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=\/wp\/v2\/posts\/363\/revisions\/377"}],"wp:attachment":[{"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=%2Fwp%2Fv2%2Fmedia&parent=363"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=%2Fwp%2Fv2%2Fcategories&post=363"},{"taxonomy":"post_tag","embeddable":true,"href":"http:\/\/blog.ltzs.us\/index.php?rest_route=%2Fwp%2Fv2%2Ftags&post=363"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}