LogNotes

2025-09-02 10:09:35
#mysql simple #query

import mysql.connector
from mysql.connector import Error
import time
import logging
import datetime
import argparse

# Configure logging
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] %(threadName)s: %(message)s',
    handlers=[
        logging.FileHandler('select_time.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Database configuration
node = {"host": "192.168.122.130", "port": 3306, "user": "root", "password": "12345678"}
db_name = 'provocation_db'
table_name = 'provocation_table'
default_query = f"SELECT * FROM {table_name} WHERE id > 3;"

def check_connection(node):
    """Check connection to the database"""
    try:
        conn = mysql.connector.connect(
            host=node['host'], port=node['port'], user=node['user'], password=node['password']
        )
        logger.info(f"Connection to {node['host']}:{node['port']} successful")
        conn.close()
        return True
    except Error as e:
        logger.error(f"Failed to connect to {node['host']}:{node['port']}: {e}")
        return False

def execute_query(query):
    """Execute the provided SQL query and measure execution time"""
    try:
        with mysql.connector.connect(
            host=node['host'],
            port=node['port'],
            user=node['user'],
            password=node['password'],
            database=db_name,
            autocommit=True
        ) as conn:
            with conn.cursor() as cursor:
                # Validate query
                if not query.strip():
                    raise ValueError("Query cannot be empty")
                
                # Warn for potentially dangerous queries
                query_lower = query.lower().strip()
                if ('delete' in query_lower or 'update' in query_lower) and 'where' not in query_lower:
                    logger.warning("Query appears to be a DELETE or UPDATE without a WHERE clause. This may affect all rows!")

                # Measure start time
                start_time = time.time()
                
                # Execute query
                cursor.execute(query)
                
                # Handle query results
                if query_lower.startswith('select'):
                    results = cursor.fetchall()
                    row_count = len(results)
                    # Measure end time
                    end_time = time.time()
                    duration = (end_time - start_time) * 1000  # Convert to milliseconds
                    # Log results
                    logger.info(f"Query executed: {query}")
                    if results:
                        logger.info(f"Number of rows returned: {row_count}")
                        for i, row in enumerate(results, 1):
                            logger.info(f"Result {i}: {row}")
                    else:
                        logger.info("Result: No records found")
                else:
                    # For non-SELECT queries (INSERT, UPDATE, DELETE)
                    row_count = cursor.rowcount
                    # Measure end time
                    end_time = time.time()
                    duration = (end_time - start_time) * 1000  # Convert to milliseconds
                    # Log results
                    logger.info(f"Query executed: {query}")
                    logger.info(f"Affected rows: {row_count}")
                
                logger.info(f"Execution time: {duration:.3f} ms")
                return results if query_lower.startswith('select') else row_count, duration
    except Error as e:
        logger.error(f"Error executing query: {e}")
        raise
    except ValueError as e:
        logger.error(f"Invalid query: {e}")
        raise

def parse_arguments():
    """Parse command-line arguments for the SQL query"""
    parser = argparse.ArgumentParser(description="Execute a custom SQL query and measure its execution time")
    parser.add_argument('--query', type=str, default=default_query,
                        help=f"SQL query to execute (default: '{default_query}')")
    return parser.parse_args()

if __name__ == "__main__":
    try:
        logger.info(f"Starting script at {datetime.datetime.now()}")
        
        # Parse command-line arguments
        args = parse_arguments()
        query = args.query
        
        # Check database connection
        if not check_connection(node):
            logger.error("Connection failed. Exiting.")
            exit(1)
        
        # Execute query and measure time
        results, duration = execute_query(query)
        
        logger.info(f"Script completed at {datetime.datetime.now()}")
    except Exception as e:
        logger.error(f"Error in main: {e}")
← Previous Next →
Back to list