2025-09-02 10:09:35
#mysql simple #query
import mysql.connector
from mysql.connector import Error
import time
import logging
import datetime
import argparse
# Configure logging
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
handlers=[
logging.FileHandler('select_time.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Database configuration
node = {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"}
db_name = 'provocation_db'
table_name = 'provocation_table'
default_query = f"SELECT * FROM {table_name} WHERE id > 3;"
def check_connection(node):
"""Check connection to the database"""
try:
conn = mysql.connector.connect(
host=node['host'], port=node['port'], user=node['user'], password=node['password']
)
logger.info(f"Connection to {node['host']}:{node['port']} successful")
conn.close()
return True
except Error as e:
logger.error(f"Failed to connect to {node['host']}:{node['port']}: {e}")
return False
def execute_query(query):
"""Execute the provided SQL query and measure execution time"""
try:
with mysql.connector.connect(
host=node['host'],
port=node['port'],
user=node['user'],
password=node['password'],
database=db_name,
autocommit=True
) as conn:
with conn.cursor() as cursor:
# Validate query
if not query.strip():
raise ValueError("Query cannot be empty")
# Warn for potentially dangerous queries
query_lower = query.lower().strip()
if ('delete' in query_lower or 'update' in query_lower) and 'where' not in query_lower:
logger.warning("Query appears to be a DELETE or UPDATE without a WHERE clause. This may affect all rows!")
# Measure start time
start_time = time.time()
# Execute query
cursor.execute(query)
# Handle query results
if query_lower.startswith('select'):
results = cursor.fetchall()
row_count = len(results)
# Measure end time
end_time = time.time()
duration = (end_time - start_time) * 1000 # Convert to milliseconds
# Log results
logger.info(f"Query executed: {query}")
if results:
logger.info(f"Number of rows returned: {row_count}")
for i, row in enumerate(results, 1):
logger.info(f"Result {i}: {row}")
else:
logger.info("Result: No records found")
else:
# For non-SELECT queries (INSERT, UPDATE, DELETE)
row_count = cursor.rowcount
# Measure end time
end_time = time.time()
duration = (end_time - start_time) * 1000 # Convert to milliseconds
# Log results
logger.info(f"Query executed: {query}")
logger.info(f"Affected rows: {row_count}")
logger.info(f"Execution time: {duration:.3f} ms")
return results if query_lower.startswith('select') else row_count, duration
except Error as e:
logger.error(f"Error executing query: {e}")
raise
except ValueError as e:
logger.error(f"Invalid query: {e}")
raise
def parse_arguments():
"""Parse command-line arguments for the SQL query"""
parser = argparse.ArgumentParser(description="Execute a custom SQL query and measure its execution time")
parser.add_argument('--query', type=str, default=default_query,
help=f"SQL query to execute (default: '{default_query}')")
return parser.parse_args()
if __name__ == "__main__":
try:
logger.info(f"Starting script at {datetime.datetime.now()}")
# Parse command-line arguments
args = parse_arguments()
query = args.query
# Check database connection
if not check_connection(node):
logger.error("Connection failed. Exiting.")
exit(1)
# Execute query and measure time
results, duration = execute_query(query)
logger.info(f"Script completed at {datetime.datetime.now()}")
except Exception as e:
logger.error(f"Error in main: {e}")
Back to list