On my 4G sticks, I’d like to have a way to know how much data is used each day and what the remaining balance is, and have that sent to me on a regular basis (and logged to the victoriametrics time-series database). Can use data to do that or SMS. But I need a way to send and parse multi-step USSD codes as well as send SMS messages and emails. I asked claude.ai to create a python script that does all that for me. After some debugging it came up with the following script. The .conf file has all the adjustable things like app-specific password for gmail, USSD codes specific to my carrier, and the phone number I want to text with weekly summaries. The schedule is managed by service files and timers.
sudo nano /etc/sim-balance-monitor.conf
# SIM Balance Monitor Configuration
# File: /etc/sim-balance-monitor.conf
[General]
# Modem index (find with: mmcli -L)
modem_index = 0
# Data storage paths
data_file = /var/log/sim-balance/balance-data.csv
pending_file = /var/log/sim-balance/pending-report.txt
log_file = /var/log/sim-balance/monitor.log
[USSD]
# Data balance USSD codes
data_init = #1234#
data_responses = 3,1
# SMS balance USSD codes
sms_init = #123#
sms_responses = 0,3
[Email]
# Email recipient
to = address@gmail.com
# Email sender (optional, will use hostname if not specified)
from = address@gmail.com
# SMTP server settings
smtp_host = smtp.gmail.com
smtp_port = 587
smtp_user = address@gmail.com
smtp_password = <app-specific password>
[SMS]
# SMS recipient for weekly reports
to = <phone number>
change permissions to protect the app-specific password
sudo chmod 600 /etc/sim-balance-monitor.conf
The cron jobs look like
sudo crontab -e
0 9 * * * /usr/local/bin/check_data_balance.sh daily
0 10 * * 1 /usr/local/bin/check_data_balance.sh weekly
Script itself is here:
#!/usr/bin/env python3
"""
SIM Balance Monitor
Checks SIM card data balance daily and sends weekly email/SMS reports
"""
import subprocess
import re
import csv
import time
import smtplib
import socket
import os
from datetime import datetime, timedelta
from pathlib import Path
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import sys
import argparse
import logging
import configparser
# Default configuration file path
DEFAULT_CONFIG_FILE = '/etc/sim-balance-monitor.conf'
# Default values if not specified in config
DEFAULT_CONFIG = {
'modem_index': '0',
'data_file': '/var/log/sim-balance/balance-data.csv',
'pending_file': '/var/log/sim-balance/pending-report.txt',
'log_file': '/var/log/sim-balance/monitor.log',
'ussd_init': '#1234#',
'ussd_responses': '3,1',
'sms_ussd_init': '#123#',
'sms_ussd_responses': '0,3',
}
def load_config(config_file=DEFAULT_CONFIG_FILE):
"""Load configuration from file"""
config = configparser.ConfigParser()
if not Path(config_file).exists():
print(f"ERROR: Configuration file not found: {config_file}", file=sys.stderr)
print(f"Please create the configuration file at {config_file}", file=sys.stderr)
print(f"See the installation guide for the configuration template.", file=sys.stderr)
sys.exit(1)
try:
config.read(config_file)
except Exception as e:
print(f"ERROR: Failed to parse configuration file: {e}", file=sys.stderr)
sys.exit(1)
# Build configuration dictionary
cfg = {}
# General settings
cfg['modem_index'] = int(config.get('General', 'modem_index', fallback=DEFAULT_CONFIG['modem_index']))
cfg['data_file'] = config.get('General', 'data_file', fallback=DEFAULT_CONFIG['data_file'])
cfg['pending_file'] = config.get('General', 'pending_file', fallback=DEFAULT_CONFIG['pending_file'])
cfg['log_file'] = config.get('General', 'log_file', fallback=DEFAULT_CONFIG['log_file'])
# USSD codes
cfg['ussd_init'] = config.get('USSD', 'data_init', fallback=DEFAULT_CONFIG['ussd_init'])
cfg['ussd_responses'] = config.get('USSD', 'data_responses', fallback=DEFAULT_CONFIG['ussd_responses']).split(',')
cfg['sms_ussd_init'] = config.get('USSD', 'sms_init', fallback=DEFAULT_CONFIG['sms_ussd_init'])
cfg['sms_ussd_responses'] = config.get('USSD', 'sms_responses', fallback=DEFAULT_CONFIG['sms_ussd_responses']).split(',')
# Email settings
cfg['email_to'] = config.get('Email', 'to', fallback='')
cfg['email_from'] = config.get('Email', 'from', fallback=f'sim-monitor@{socket.gethostname()}')
cfg['smtp_host'] = config.get('Email', 'smtp_host', fallback='smtp.gmail.com')
cfg['smtp_port'] = int(config.get('Email', 'smtp_port', fallback='587'))
cfg['smtp_user'] = config.get('Email', 'smtp_user', fallback='')
cfg['smtp_password'] = config.get('Email', 'smtp_password', fallback='')
# SMS settings
cfg['sms_to'] = config.get('SMS', 'to', fallback='')
return cfg
# Global config variable
CONFIG = None
# Setup logging
def setup_logging():
log_dir = Path(CONFIG['log_file']).parent
log_dir.mkdir(parents=True, exist_ok=True)
# Set log level based on environment variable
log_level = logging.DEBUG if os.getenv('DEBUG') else logging.INFO
logging.basicConfig(
level=log_level,
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler(CONFIG['log_file']),
logging.StreamHandler()
]
)
class USSDSession:
"""Manages USSD communication with the modem"""
def __init__(self, modem_index):
self.modem_index = modem_index
def _run_mmcli(self, args, timeout=30):
"""Run mmcli command and return output"""
cmd = ['mmcli', '-m', str(self.modem_index)] + args
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
logging.error(f"Command timeout: {' '.join(cmd)}")
return None, "Timeout", 1
def cancel(self):
"""Cancel any active USSD session"""
self._run_mmcli(['--3gpp-ussd-cancel'])
def initiate(self, code):
"""Initiate USSD session"""
logging.info(f"Initiating USSD: {code}")
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-initiate', code])
# Log full output for debugging
if stdout:
logging.debug(f"STDOUT: {stdout}")
if stderr:
logging.debug(f"STDERR: {stderr}")
if rc != 0:
logging.error(f"USSD initiate failed: {stderr}")
return None
response = self._extract_response(stdout)
if not response:
# Sometimes response comes later, try checking status
logging.info("No immediate response, checking USSD status...")
time.sleep(2)
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-status'])
if stdout:
logging.debug(f"Status output: {stdout}")
response = self._extract_response(stdout)
return response
def respond(self, response):
"""Respond to USSD prompt"""
logging.info(f"Responding: {response}")
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-respond', response])
# Log full output for debugging
if stdout:
logging.debug(f"STDOUT: {stdout}")
if stderr:
logging.debug(f"STDERR: {stderr}")
if rc != 0:
logging.error(f"USSD respond failed: {stderr}")
return None
result = self._extract_response(stdout)
if not result:
# Try checking status
logging.info("No immediate response, checking USSD status...")
time.sleep(2)
stdout, stderr, rc = self._run_mmcli(['--3gpp-ussd-status'])
if stdout:
logging.debug(f"Status output: {stdout}")
result = self._extract_response(stdout)
return result
def _extract_response(self, output):
"""Extract response text from mmcli output"""
if not output:
logging.warning("No output from mmcli command")
return None
# Log the raw output for debugging
logging.debug(f"Raw mmcli output: {output}")
# First try to extract from 'network request:' (multi-line format from --3gpp-ussd-status)
if 'network request:' in output:
lines = output.split('\n')
response_lines = []
capture = False
for line in lines:
if 'network request:' in line:
# Start capturing from this line
capture = True
# Get the text after "network request:"
parts = line.split('network request:', 1)
if len(parts) > 1 and parts[1].strip():
response_lines.append(parts[1].strip())
elif capture:
# Continue capturing indented lines
if line.strip() and (line.startswith(' |') or line.startswith(' |')):
# Remove the leading pipe and whitespace
text = line.split('|', 1)
if len(text) > 1:
response_lines.append(text[1].strip())
elif 'network notification:' in line or not line.strip().startswith('|'):
# Stop capturing when we hit another field or unindented line
break
if response_lines:
response = ' '.join(response_lines)
logging.info(f"Extracted response from 'network request': {response[:150]}...")
return response
# Try to extract from 'new reply from network:' (single-line format from initiate/respond)
match = re.search(r"new reply from network: '(.+?)'", output, re.MULTILINE | re.DOTALL)
if match:
response = match.group(1).strip()
# Clean up the response (remove extra whitespace, newlines)
response = ' '.join(response.split())
logging.info(f"Extracted response from 'new reply': {response[:150]}...")
return response
# Try other legacy patterns
patterns = [
r"response: '(.+?)'",
r'response: "(.+?)"',
]
for pattern in patterns:
match = re.search(pattern, output, re.MULTILINE | re.DOTALL)
if match:
response = match.group(1).strip()
response = ' '.join(response.split())
logging.info(f"Extracted response: {response[:150]}...")
return response
# If no pattern matches, log the full output
logging.warning(f"Could not extract response from output: {output[:300]}")
return None
class BalanceChecker:
"""Handles balance checking and data storage"""
def __init__(self):
self.data_file = Path(CONFIG['data_file'])
self.data_file.parent.mkdir(parents=True, exist_ok=True)
self.ussd = USSDSession(CONFIG['modem_index'])
def check_data_balance(self):
"""Perform USSD sequence to check data balance"""
logging.info("Starting data balance check")
# Cancel any existing session
self.ussd.cancel()
time.sleep(2)
# Initial USSD
response = self.ussd.initiate(CONFIG['ussd_init'])
if not response:
logging.error("No response from initial USSD")
return None
logging.info(f"Data Response 1: {response}")
# Follow-up responses
for i, resp_code in enumerate(CONFIG['ussd_responses'], 2):
time.sleep(3)
response = self.ussd.respond(resp_code)
if not response:
logging.error(f"No response from data step {i}")
self.ussd.cancel()
return None
logging.info(f"Data Response {i}: {response}")
self.ussd.cancel()
return response
def check_sms_balance(self):
"""Perform USSD sequence to check SMS balance"""
logging.info("Starting SMS balance check")
# Cancel any existing session
self.ussd.cancel()
time.sleep(2)
# Initial USSD
response = self.ussd.initiate(CONFIG['sms_ussd_init'])
if not response:
logging.error("No response from initial SMS USSD")
return None
logging.info(f"SMS Response 1: {response}")
# Follow-up responses
for i, resp_code in enumerate(CONFIG['sms_ussd_responses'], 2):
time.sleep(3)
response = self.ussd.respond(resp_code)
if not response:
logging.error(f"No response from SMS step {i}")
self.ussd.cancel()
return None
logging.info(f"SMS Response {i}: {response}")
self.ussd.cancel()
return response
def parse_data_balance(self, response):
"""Parse data balance and expiry from response"""
# Pattern: "6676 Mo valables jusqu'au 10/11/2025 à 17:51:29"
pattern = r'(\d+)\s*Mo\s*valables\s*jusqu\'au\s*(\d{2}/\d{2}/\d{4})\s*à\s*(\d{2}:\d{2}:\d{2})'
match = re.search(pattern, response)
if match:
balance_mb = int(match.group(1))
balance_gb = round(balance_mb / 1024, 2)
expiry_date = match.group(2)
expiry_time = match.group(3)
logging.info(f"Data Balance: {balance_mb} MB ({balance_gb} GB), Expires: {expiry_date} {expiry_time}")
return {
'balance_mb': balance_mb,
'balance_gb': balance_gb,
'expiry_date': expiry_date,
'expiry_time': expiry_time
}
else:
logging.error(f"Could not parse data balance from: {response}")
return None
def parse_sms_balance(self, response):
"""Parse SMS balance and expiry from response"""
# Pattern: "935 SMS, valable jusqu'au 10/11/2025 17:51:29"
pattern = r'(\d+)\s*SMS.*?valable\s*jusqu\'au\s*(\d{2}/\d{2}/\d{4})\s*(\d{2}:\d{2}:\d{2})'
match = re.search(pattern, response)
if match:
sms_count = int(match.group(1))
expiry_date = match.group(2)
expiry_time = match.group(3)
logging.info(f"SMS Balance: {sms_count} SMS, Expires: {expiry_date} {expiry_time}")
return {
'sms_count': sms_count,
'expiry_date': expiry_date,
'expiry_time': expiry_time
}
else:
logging.error(f"Could not parse SMS balance from: {response}")
return None
def save_balance(self, data_info, sms_info):
"""Save balance data to CSV"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
with open(self.data_file, 'a', newline='') as f:
writer = csv.writer(f)
if data_info and sms_info:
writer.writerow([
timestamp,
data_info['balance_mb'],
data_info['balance_gb'],
data_info['expiry_date'],
data_info['expiry_time'],
sms_info['sms_count'],
sms_info['expiry_date'],
sms_info['expiry_time']
])
elif data_info:
writer.writerow([
timestamp,
data_info['balance_mb'],
data_info['balance_gb'],
data_info['expiry_date'],
data_info['expiry_time'],
'ERROR', 'ERROR', 'ERROR'
])
elif sms_info:
writer.writerow([
timestamp,
'ERROR', 'ERROR', 'ERROR', 'ERROR',
sms_info['sms_count'],
sms_info['expiry_date'],
sms_info['expiry_time']
])
else:
writer.writerow([timestamp, 'ERROR', 'ERROR', 'ERROR', 'ERROR', 'ERROR', 'ERROR', 'ERROR'])
def run(self):
"""Run complete balance check"""
data_response = self.check_data_balance()
data_info = self.parse_data_balance(data_response) if data_response else None
# Wait a bit between checks
time.sleep(5)
sms_response = self.check_sms_balance()
sms_info = self.parse_sms_balance(sms_response) if sms_response else None
self.save_balance(data_info, sms_info)
return data_info is not None or sms_info is not None
class ReportGenerator:
"""Generates reports from balance data"""
def __init__(self):
self.data_file = Path(CONFIG['data_file'])
def get_weekly_data(self):
"""Get balance data from last 7 days"""
if not self.data_file.exists():
return []
cutoff = datetime.now() - timedelta(days=7)
cutoff_str = cutoff.strftime('%Y-%m-%d')
data = []
with open(self.data_file, 'r') as f:
reader = csv.reader(f)
for row in reader:
if row and row[0] >= cutoff_str:
data.append(row)
return data
def calculate_daily_usage(self, data):
"""Calculate average daily data usage"""
prev_balance = None
total_usage = 0
usage_days = 0
for row in data:
if len(row) > 1 and row[1] not in ['ERROR', 'N/A']:
balance = int(row[1])
if prev_balance is not None:
daily_usage = prev_balance - balance
if daily_usage >= 0:
total_usage += daily_usage
usage_days += 1
prev_balance = balance
return total_usage // usage_days if usage_days > 0 else 0
def calculate_daily_sms_usage(self, data):
"""Calculate average daily SMS usage"""
prev_balance = None
total_usage = 0
usage_days = 0
for row in data:
if len(row) > 5 and row[5] not in ['ERROR', 'N/A']:
balance = int(row[5])
if prev_balance is not None:
daily_usage = prev_balance - balance
if daily_usage >= 0:
total_usage += daily_usage
usage_days += 1
prev_balance = balance
return total_usage // usage_days if usage_days > 0 else 0
def generate_email_report(self, data):
"""Generate detailed email report"""
report = "SIM Card Data Balance Report\n"
report += "=" * 80 + "\n\n"
report += "Report Period: Last 7 Days\n"
report += f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n"
if not data:
report += "No data collected during this period.\n"
return report
report += "Daily Balance History:\n"
report += "-" * 80 + "\n"
report += f"{'Date':<19} | {'Data Balance':<16} | {'SMS Balance':<12} | {'Expiry'}\n"
report += "-" * 80 + "\n"
for row in data:
if len(row) >= 8:
timestamp, balance_mb, balance_gb, data_expiry_date, data_expiry_time, sms_count, sms_expiry_date, sms_expiry_time = row[:8]
else:
# Old format without SMS data
timestamp = row[0] if len(row) > 0 else 'ERROR'
balance_mb = row[1] if len(row) > 1 else 'ERROR'
balance_gb = row[2] if len(row) > 2 else 'ERROR'
data_expiry_date = row[3] if len(row) > 3 else 'ERROR'
data_expiry_time = row[4] if len(row) > 4 else 'ERROR'
sms_count = 'N/A'
sms_expiry_date = 'N/A'
sms_expiry_time = ''
if balance_mb == 'ERROR' and sms_count == 'ERROR':
report += f"{timestamp:<19} | {'ERROR':<16} | {'ERROR':<12} | Check logs\n"
else:
data_str = f"{balance_mb} MB ({balance_gb}GB)" if balance_mb != 'ERROR' else 'ERROR'
sms_str = f"{sms_count} SMS" if sms_count not in ['ERROR', 'N/A'] else sms_count
expiry_str = f"{data_expiry_date} {data_expiry_time}" if data_expiry_date != 'ERROR' else 'ERROR'
report += f"{timestamp:<19} | {data_str:<16} | {sms_str:<12} | {expiry_str}\n"
report += "-" * 80 + "\n\n"
# Statistics
valid_data_balances = [int(row[1]) for row in data if len(row) > 1 and row[1] not in ['ERROR', 'N/A']]
valid_sms_balances = [int(row[5]) for row in data if len(row) > 5 and row[5] not in ['ERROR', 'N/A']]
if valid_data_balances or valid_sms_balances:
report += "Summary Statistics:\n"
if valid_data_balances:
avg_data = sum(valid_data_balances) // len(valid_data_balances)
min_data = min(valid_data_balances)
max_data = max(valid_data_balances)
avg_usage = self.calculate_daily_usage(data)
report += f" Data - Average Balance: {avg_data} MB\n"
report += f" Data - Minimum Balance: {min_data} MB\n"
report += f" Data - Maximum Balance: {max_data} MB\n"
report += f" Data - Average Daily Usage: {avg_usage} MB/day\n"
if valid_sms_balances:
avg_sms = sum(valid_sms_balances) // len(valid_sms_balances)
min_sms = min(valid_sms_balances)
max_sms = max(valid_sms_balances)
avg_sms_usage = self.calculate_daily_sms_usage(data)
report += f" SMS - Average Balance: {avg_sms} SMS\n"
report += f" SMS - Minimum Balance: {min_sms} SMS\n"
report += f" SMS - Maximum Balance: {max_sms} SMS\n"
report += f" SMS - Average Daily Usage: {avg_sms_usage} SMS/day\n"
return report
def generate_sms_report(self, data):
"""Generate compact SMS report"""
if not data:
return None
sms = f"SIM Wk {datetime.now().strftime('%m/%d')}:"
last_expiry = None
last_sms_count = None
for row in data:
if len(row) >= 8:
timestamp, balance_mb, _, expiry_date, _, sms_count, _, _ = row[:8]
else:
# Old format
timestamp = row[0] if len(row) > 0 else None
balance_mb = row[1] if len(row) > 1 else 'ERROR'
expiry_date = row[3] if len(row) > 3 else None
sms_count = 'N/A'
if timestamp and balance_mb not in ['ERROR', 'N/A']:
day = timestamp.split()[0].split('-')[2]
sms += f" {day}:{balance_mb}M"
last_expiry = expiry_date
if sms_count not in ['ERROR', 'N/A']:
last_sms_count = sms_count
# Add data statistics
avg_usage = self.calculate_daily_usage(data)
valid_balances = [int(row[1]) for row in data if len(row) > 1 and row[1] not in ['ERROR', 'N/A']]
min_balance = min(valid_balances) if valid_balances else 0
sms += f" Avg:{avg_usage}M/d Min:{min_balance}M"
# Add SMS info if available
if last_sms_count:
sms += f" SMS:{last_sms_count}"
# Add expiry
if last_expiry:
expiry_compact = '/'.join(last_expiry.split('/')[:2])
sms += f" Exp:{expiry_compact}"
return sms
class EmailSender:
"""Handles email sending via SMTP"""
def check_network(self):
"""Check if network is available"""
for host in ['8.8.8.8', '1.1.1.1']:
try:
socket.create_connection((host, 53), timeout=5)
return True
except OSError:
continue
return False
def send_email(self, subject, body):
"""Send email using SMTP"""
if not self.check_network():
logging.warning("No network connectivity")
return False
try:
msg = MIMEMultipart()
msg['From'] = CONFIG['email_from']
msg['To'] = CONFIG['email_to']
msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
with smtplib.SMTP(CONFIG['smtp_host'], CONFIG['smtp_port'], timeout=30) as server:
server.starttls()
server.login(CONFIG['smtp_user'], CONFIG['smtp_password'])
server.send_message(msg)
logging.info("Email sent successfully")
return True
except Exception as e:
logging.error(f"Failed to send email: {e}")
return False
class SMSSender:
"""Handles SMS sending via mmcli"""
def __init__(self):
self.modem_index = CONFIG['modem_index']
def send_sms(self, message):
"""Send SMS using mmcli"""
logging.info(f"Sending SMS ({len(message)} chars): {message}")
# Step 1: Create the SMS
# Note: Use proper quoting for text with spaces
create_cmd = [
'mmcli', '-m', str(self.modem_index),
'--messaging-create-sms',
f'text="{message}",number="{CONFIG["sms_to"]}"'
]
try:
result = subprocess.run(
create_cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
logging.error(f"Failed to create SMS: {result.stderr}")
return False
# Extract SMS index from output
# Output looks like: "Successfully created new SMS: /org/freedesktop/ModemManager1/SMS/0"
match = re.search(r'/SMS/(\d+)', result.stdout)
if not match:
logging.error(f"Could not extract SMS index from output: {result.stdout}")
return False
sms_index = match.group(1)
logging.info(f"Created SMS with index: {sms_index}")
# Step 2: Send the SMS
send_cmd = [
'mmcli', '-s', sms_index,
'--send'
]
result = subprocess.run(
send_cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0:
logging.info(f"SMS sent successfully to {CONFIG['sms_to']}")
return True
else:
logging.error(f"Failed to send SMS: {result.stderr}")
return False
except Exception as e:
logging.error(f"SMS send error: {e}")
return False
class WeeklyReporter:
"""Manages weekly email and SMS reports"""
def __init__(self):
self.report_gen = ReportGenerator()
self.email_sender = EmailSender()
self.sms_sender = SMSSender()
self.pending_file = Path(CONFIG['pending_file'])
def send_reports(self):
"""Send both email and SMS reports"""
logging.info("Starting weekly reports")
# Get current week data
current_data = self.report_gen.get_weekly_data()
# Load pending data if exists
pending_data = []
if self.pending_file.exists():
logging.info("Found pending data from previous week(s)")
with open(self.pending_file, 'r') as f:
reader = csv.reader(f)
pending_data = list(reader)
# Combine data
all_data = pending_data + current_data
if not all_data:
logging.info("No data to report")
return
# Generate reports
email_body = self.report_gen.generate_email_report(all_data)
sms_text = self.report_gen.generate_sms_report(current_data)
# Send email
subject = f"Weekly SIM Balance Report - {datetime.now().strftime('%Y-%m-%d')}"
if self.email_sender.send_email(subject, email_body):
# Clear pending data on success
if self.pending_file.exists():
self.pending_file.unlink()
# Archive old data (keep 60 days)
self.archive_old_data()
else:
# Save for next week
logging.info("Saving data for next week")
with open(self.pending_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerows(all_data)
# Send SMS (fire-and-forget)
if sms_text:
self.sms_sender.send_sms(sms_text)
def archive_old_data(self):
"""Keep only last 60 days of data"""
data_file = Path(CONFIG['data_file'])
if not data_file.exists():
return
cutoff = datetime.now() - timedelta(days=60)
cutoff_str = cutoff.strftime('%Y-%m-%d')
temp_file = data_file.with_suffix('.tmp')
with open(data_file, 'r') as fin, open(temp_file, 'w', newline='') as fout:
reader = csv.reader(fin)
writer = csv.writer(fout)
for row in reader:
if row and row[0] >= cutoff_str:
writer.writerow(row)
temp_file.replace(data_file)
logging.info("Archived old data")
def main():
global CONFIG
parser = argparse.ArgumentParser(description='SIM Balance Monitor')
parser.add_argument('action', choices=['check', 'report'],
help='Action to perform: check (daily balance) or report (weekly summary)')
parser.add_argument('-c', '--config', default=DEFAULT_CONFIG_FILE,
help=f'Configuration file path (default: {DEFAULT_CONFIG_FILE})')
args = parser.parse_args()
# Load configuration FIRST
CONFIG = load_config(args.config)
# THEN setup logging (needs CONFIG to be loaded)
setup_logging()
try:
if args.action == 'check':
checker = BalanceChecker()
success = checker.run()
sys.exit(0 if success else 1)
elif args.action == 'report':
reporter = WeeklyReporter()
reporter.send_reports()
sys.exit(0)
except Exception as e:
logging.error(f"Unexpected error: {e}", exc_info=True)
sys.exit(1)
if __name__ == '__main__':
main()
Install at /usr/local/bin/ and change permissions to allow execution.