ssuperjun 님의 블로그
[과제5-6] DB 모니터링 프로그램 Python 변환 6편: 추가 구현, 조사 본문
slowquery 스크립트
ca801에서 은빈님이 slowquery 스크립트 어떻게 작성했는지, slowquery 작동 방법 알아보기
ca801에 접속해 generate_slowquery.py 코드 확인
이 코드는 MySQL 서버에 SELECT SLEEP(10);을 실행시킴
crontab으로 주기적 실행되도록 해 놓음
crontab -l 명령어로 확인
*/5 * * * * /home1/irteam/venv/bin/python /home1/irteam/script/generate_slowquery.py >> /home1/irteam/logs/cron_fallback.log 2>&1
24시간 내내 5분마다 실행됨
모니터링 명령어 조사
show processlist, innodb status, innodb lock을 왜 하는지, 더 추가하면 좋을 모니터링 명령은 없을지 조사하기
1. SHOW PROCESSLIST
[log.processlist 테이블에 저장될 컬럼 종류]
| 순서 | 컬럼명 | 설명 |
| 0 | logtime | 수집 시점 (YYYYMMDDHHMMSS) |
| 1 | logdate | 수집 일자 (YYYYMMDD) |
| 2 | host | 대상 서버 호스트명 |
| 3 | port | 대상 서버 포트번호 |
| 4 | pid | MySQL 프로세스 ID |
| 5 | user | 유저명 |
| 6 | ip | 접속 IP |
| 7 | db | 현재 접속 중인 DB 이름 |
| 8 | command | 실행 중인 명령 (ex. Query) |
| 9 | execTime | 실행 시간 (초) |
| 10 | state | 쿼리 상태 (Waiting, Locked) |
| 11 | Query (BLOB) | 실행 중인 쿼리 텍스트 |
| 12 | nOrder | 내부 수집 순서 |
pid(4번)는 MySQL 내부에서 쿼리를 실행 중인 세션의 고유 ID
ip는 process2.c를 실행 중인 서버 주소
*MySQL에서의 세션은 클라이언트 1개가 MySQL 서버에 접속해서 연결이 유지되는 것. 연결 종료 시 세션은 사라짐
한 세션에서 여러 쿼리를 실행할 수 있다. 한 시점에 하나만.
[command(8번) 값 종류]
| Command 값 | 의미 |
| Query | 일반적인 쿼리 실행 중 |
| Sleep | 아무 작업 없이 idle 상태 |
| Connect | 클라이언트 연결 중 |
| Binlog Dump | replication 용 스레드 |
| Quit | 종료 요청 중 |
| Kill | kill 명령 대기 중 |
| Execute | Prepared Statement 실행 중 |
| Prepare | Prepared Statement 준비 중 |
[state(10번) 값 종류]
| State 값 | 의미 |
| Sending data | 쿼리 실행 결과 전송 중 |
| Locked | 테이블/행 락에 의해 대기 중 |
| Waiting for table metadata lock | 메타락 대기 |
| Copying to tmp table | 결과를 임시 테이블에 복사 중 |
| Sorting result | 정렬 중 |
| executing | 쿼리 실행 중 |
| statistics | 통계 계산 중 |
| checking permissions | 권한 확인 중 |
| Opening tables | 테이블 오픈 중 |
| Killed | kill 명령 처리 중 |
| Cleaning up | 종료 처리 중 |
| init | 초기화 중 |
| end | 쿼리 종료 처리 중 |
[조건에 따라 경고성 로그 출력하는 부분]
if( (strncmp("Locked", str_state, 6) == 0 &&…))
- 락 대기로 5초 이상 대기 중 -> 주의 필요
- kill 요청 상태 -> 세션 강제 종료 진행 중
- 결과 전송이 5초 이상 -> 대량 데이터 가능성
- 락 대기 혹은 기타 대기 5초 이상
- 실행 중인 쿼리가 5초 초과 -> 느린 쿼리
- 정렬 작업이 5초 초과 -> 정렬 최적화 필요 확인
[SHOW PROCESSLIST 수집 이유]
현재 MySQL 내 실행 중인 쿼리 목록을 확인하기 위함
각 쿼리의 상태(Command, State, Time, User, Host, Query) 등을 통해
- idle 상태(Command 값이 sleep)가 오래 지속되는 것 체크
- 오래 걸리는 쿼리(실행 시간이 일정 시간(5초)보다 큰 경우)
- killed, locked, sending data, waiting 등 체크
확인
=> 쿼리 병목 분석, 지연이나 락 대기의 조기 감지, 과도한 접속(Command 값이 connect) 감지
2. SHOW ENGINE INNODB STATUS
[log.innodb 테이블에 저장될 컬럼 종류]
| 컬럼명 | 설명 |
| logtime | 수집 시각 |
| host | 대상 서버명 |
| port | 포트 번호 |
| nOrder | 스레드 순번 |
| Status | InnoDB 상태 (압축 BLOB) |
Status 값은 InnoDB 엔진의 내부 상태 요약으로, 하나의 긴 텍스트 블록 형태로 나옴
BLOB은 문자열을 압축해 binary(이진)로 저장한 형태
예시
=====================================
2026-01-18 13:41:28 INNODB MONITOR OUTPUT
=====================================
Per second averages calculated from the last 3 seconds
-----------------
BACKGROUND THREAD
-----------------
srv_master_thread loops: 12447 1_second, 12446 sleeps, 1244 10_second, 473 pages flushed
...
------------
TRANSACTIONS
------------
Trx id counter 0 217950872
Purge done for trx's n:o < 0 217950870 undo n:o < 0 0
History list length 2
LIST OF TRANSACTIONS FOR EACH SESSION:
---TRANSACTION 217950871, ACTIVE 20 sec
2 lock struct(s), heap size 376, 1 row lock(s)
MySQL thread id 123456, OS thread handle 12345678, query id 54321 localhost root
*Status는 단순 문자열이기 때문에 Python 등으로 분석 시에는 UNCOMPRESS(Status) 후 파싱해야 됨
[Status 값의 주요 구성]
| 섹션 제목 | 설명 |
| BACKGROUND THREAD | InnoDB 백그라운드 작업 상태 |
| SEMAPHORES | 락/뮤텍스 충돌 통계 |
| LATEST DETECTED DEADLOCK | 최근 Deadlock 발생 내역 |
| TRANSACTIONS | 트랜잭션 상태, History List Length |
| FILE I/O | 디스크 I/O 상태, flush 수, pending |
| BUFFER POOL AND MEMORY | 버퍼풀 캐시 효율 |
| ROW OPERATIONS | 레코드 삽입/업데이트 통계 |
| LATEST FOREIGN KEY ERROR | 최근 외래키 오류 |
[수집 이유]
InnoDB 스토리지 엔진의 내부 상태 정보를 확인하기 위함
- Deadlock 정보
- 트랜잭션 상태
- 락 테이블 상황
- 버퍼 풀 및 I/O 상태
=> 데드락, 버퍼 풀의 메모리 병목 감지, 디스크 I/O 병목 징후 감지
3. SHOW INNODB LOCK
[log.innodb_lock 테이블에 저장될 컬럼 종류]
| 컬럼명 | 설명 |
| logtime, logdate | 수집 시각 |
| host, port | 서버 식별자 |
| waiting_trx_id | 락을 기다리는 트랜잭션 ID |
| waiting_thread_id | 해당 세션의 스레드 ID (SHOW PROCESSLIST의 Id와 동일) |
| waiting_time | 대기 시간 |
| waiting_query | 대기 중 쿼리 내용 |
| blocking_trx_id | 락을 보유한 트랜잭션 ID |
| blocking_thread_id | 블로킹 세션의 스레드 ID |
| blocking_time | 락 점유 시간 |
| blocking_query | 블로킹 쿼리 내용 |
[수집 이유]
트랜잭션 간 락 충돌 상황을 확인
누가 누굴 막고 있는지 명확히 분석 가능
waiting_trx_id: 어떤 쿼리가 기다리고 있는지
blocking_trx_id: 어떤 쿼리가 블로킹하고 있는지
=> 데드락 징후 감지
더 추가하면 좋을 모니터링 명령
1. SHOW GLOBAL STATUS, SHOW GLOBAL VARIABLES
전체 인스턴스의 성능 지표, 설정 값 확인
- Threads_connected, Threads_running
- Slow_queries
- Innodb_buffer_pool_reads, Innodb_log_waits
실시간 부하 추이, 커넥션 수 급증, slow query 비율 증가 등 파악 가능
옵티마이저가 사용할 통계 정보와 관련해서 innodb_stats_persistent, innodb_stats_auto_recalc 등으로 통계 정보 저장(테이블별로 통계 저장할건지), 자동 갱신 정책 확인 가능
2. INFORMATION_SCHEMA.TABLES, INFORMATION_SCHEMA.INDEXES
테이블/인덱스 크기 분석
사용되지 않는 인덱스, 과도한 테이블 증가 감지
=> 불필요한 인덱스 정리, 들어오는 데이터가 늘어났는지 확인
그 외
EXPLAIN [FORMAT=JSON] <쿼리>
실행 계획 확인: 인덱스 사용 여부, 조인 순서 등(인덱스 사용에서 풀스캔으로 바뀐건지, 조인 방식이 바뀐건지)
.sh, bash profile 이용한 설치 과정, 명령어 단순화
의존성 패키지 설치, mysql client 설치 부분을 .sh로 한번에 가능하도록 만들기
/home1/irteam 경로에 setup_process2.sh 생성
-- setup_process2.sh
#!/bin/bash
# Exit on error
set -e
echo "📦 [1] Updating package list..."
sudo apt update
echo "🐬 [2] Installing MySQL client..."
sudo apt install -y mysql-client
echo "✅ MySQL Client version:"
mysql --version
echo "🔧 [3] Installing C dependencies (libmysqlclient-dev)..."
sudo apt install -y libmysqlclient-dev
echo "🐍 [4] Installing Python & pip..."
sudo apt install -y python3 python3-pip
echo "🧱 [5] Installing Python 3.12 venv..."
sudo apt install -y python3.12-venv
echo "📁 [6] Creating Python virtual environment at /home1/irteam/venv-process2..."
python3 -m venv /home1/irteam/venv-process2
echo "📲 [7] Activating virtual environment and installing pymysql..."
source /home1/irteam/venv-process2/bin/activate
pip install pymysql
echo "✅ All setup steps completed successfully."
권한 부여
chmod +x setup_process2.sh
실행
./setup_process2.sh
*이 파일 내 명령어들은 sudo 명령어이므로 irteamsu에서 이 파일을 실행해야 함
bash profile로 python 실행 명령어 단순화하기
bash profile: 프로그램들이 공통으로 참고해야 할 정보(환경 변수)를 여기서 설정. 사용자가 로그인할 때 한번 자동으로 실행됨
매번 /home1/irteam/script/myproc/myproc이라고 길게 치는 대신 export PATH=$PATH:/home1/irteam/script/myproc으로 PATH를 등록해 놓으면 그냥 myproc만 쳐도 실행됨
가상환경 활성화
source /home1/irteam/venv-process2/bin/activate
실행
python3 process2.py username
이걸 runproc2py username 만으로 실행 가능하도록 바꾸기
.bashrc 열기
nano ~/.bashrc
맨 아래에 다음 줄 추가
alias runproc2py='source /home1/irteam/venv-process2/bin/activate && python3 /home1/irteam/bjpark00/myproc/process2.py'
적용
source ~/.bashrc
실행
runproc2py username
이제 runproc2py username만 실행해도 된다.
*.bash_profile에 등록하는 것도 가능하지만, 긴 명령어 줄이기(Alias)는 .bashrc 파일에 적는 것이 관례
Python 실행이 끝났거나 Ctrl+C (KeyboardInterrupt)가 발생한 경우 가상환경 deactivate하기
alias 대신 함수 정의 사용해야 함
이번에는 .bash_profile에 써보기
vi ~/.bash_profile
함수 작성
runproc2py() {
source /home1/irteam/venv-process2/bin/activate
python3 /home1/irteam/bjpark00/myproc/process2.py "$@"
deactivate
}
적용
source ~/.bash_profile
실행
runproc2py username
실행 결과: Ctrl+C에 의한 비정상 종료 시, 가상환경 비활성화에 실패했음
deactivate 대신
trap 'deactivate; exit' INT TERM EXIT
로도 테스트해봤지만 실패
같은 맥락으로
C 컴파일 부분도 bash profile에 alias를 등록하는 등 단순화시킬 수 있음
gcc -o process2 process2.c -I/usr/include/mysql -lmysqlclient -lpthread
이런거
process2.py 코드 추가 구현
1. 스크립트 실행 디렉토리 안에 log 디렉토리가 없으면 에러 출력 후 종료시키지 말고 log 디렉토리 생성시킨 후 계속 진행시키기
2. 매크로 값으로 정의된 passwd 등은 config 파일(.ini파일: 텍스트 기반의 설정파일)로 따로 저장
3. debug모드를 C와 동일하게 실행되도록 에러 print문만 코드 수정 -> 실행할 땐 DEBUG=1 python3 process2.py username 명령 또는 python3 process2.py username 명령
최종 코드
# process2.py
import sys
import signal
import pymysql
import threading
import time
import datetime
import logging
from dataclasses import dataclass
from typing import List
import configparser
import os
# 환경변수 DEBUG 읽기
DEBUG = os.getenv('DEBUG', '0') == '1'
# ---------- config.ini 불러오기 ----------
config = configparser.ConfigParser()
config.read("config.ini")
# MyMON 설정
# 모니터링 대상 서버 목록이 저장된 일종의 메타 DB
MYMON_HOST = config["MYMON"]["host"]
MYMON_PORT = int(config["MYMON"]["port"])
MYMON_USER = config["MYMON"]["user"]
MYMON_PASSWD = config["MYMON"]["password"]
MYMON_DB = config["MYMON"]["database"]
# Log DB 설정
# 수집된 정보를 기록하는 DB
MYSQL_LOG_HOST = config["LOGDB"]["host"]
MYSQL_LOG_PORT = int(config["LOGDB"]["port"])
MYSQL_LOG_USER = config["LOGDB"]["user"]
MYSQL_LOG_PASSWD = config["LOGDB"]["password"]
MYSQL_LOG_DB = config["LOGDB"]["database"]
# 수집 대상 서버 접속 정보
# ThreadProcessList() 내부에서 각 dbHost[i]에 접속할 때 사용됨. dbHost[i]는 InitTable()에서 선언된 DB 호스트 정보 배열
MYSQL_USER = config["SERVICE"]["user"]
MYSQL_PASSWD = config["SERVICE"]["password"]
# 기타 일반 설정
INTERVALTIME_SEC = int(config["GENERAL"]["interval"]) # 수집 주기 (초)
THREADCOUNT = int(config["GENERAL"]["threads"]) # 최대 스레드 개수
g_nTimeout = int(config["GENERAL"]["timeout"]) # MySQL 접속 타임아웃 (초)
HOSTNAME_LEN = 25
IP_LEN = 15
LOGFILE_LEN = 18
# ---------- 구조체 ----------
@dataclass
class dbHost_struct:
host: str
port: int
ip: str
service: str
@dataclass
class Interval_Server:
nStart: int
nEnd: int
# ---------- 전역 변수 ----------
dbHost: List[dbHost_struct] = [] # 감시할 서버 정보 리스트
g_IntervalServer: List[Interval_Server] = [] # 스레드별 서버 인덱스 범위
g_nServerCnt = 0 # 감시할 서버 개수
g_strCurrTime = "" # 수집 시간 (로그 timestamp)
g_strCurrDate = ""
g_strPrevTime = ""
g_nCount = 0 # 수집 횟수
strUserID = "" # 사용자 ID (argv[1]에 해당)
g_fCheckLog = None # 로그 파일 핸들
# ---------- 로그 설정 ----------
logging.basicConfig(
filename='monitor.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
# ---------- 시간 문자열 생성 ----------
def get_time_strings():
now = datetime.datetime.now()
g_strCurrTime = now.strftime('%Y%m%d%H%M%S')
g_strCurrDate = now.strftime('%Y%m%d')
return g_strCurrTime, g_strCurrDate
# 감시(모니터링)할 서버 목록을 MyMon DB에서 조회하여 dbHost 배열에 저장
# ThreadMain 함수 첫부분에 한번만 등장할 예정
def InitTable() -> int:
global dbHost, g_nServerCnt, strUserID, g_mysql
strQuery = (
"SELECT host, "
" port, "
" SUBSTRING_INDEX(ip, '|', -1), "
" service_detail "
" FROM MYMON.serverinfo B, MYMON.admin_user C "
" WHERE B.status = 1 "
" AND B.db_charged = CONVERT(C.admin_name USING euckr) "
" AND MONITORING <> '' "
" AND repl_state != 'MM' "
f" AND C.kerberos_id = '{strUserID}' "
" ORDER BY 1"
)
# MyMON DB connect
try:
g_mysql = pymysql.connect(
host=MYMON_HOST,
user=MYMON_USER,
password=MYMON_PASSWD,
database=MYMON_DB,
port=MYMON_PORT,
charset='utf8',
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
print(f"[InitTable] Failed to connect to database: Error: {e}")
return -1
# Cursor create
# 커서: 쿼리 결과를 순회하고 조작하는 객체
try:
cursor = g_mysql.cursor()
except Exception as e:
print(f"[InitTable] Failed to create cursor: Error: {e}")
g_mysql.close()
return -1
# SET NAMES
# UTF-8 문자셋 설정
try:
cursor.execute("SET NAMES utf8")
except Exception as e:
print(f"[InitTable] Failed to set charset: Error: {e}")
cursor.close()
g_mysql.close()
return -1
# Execute query
try:
cursor.execute(strQuery)
rows = cursor.fetchall()
except Exception as e:
print(f"[InitTable] Failed to query database item: Error: {e}")
cursor.close()
g_mysql.close()
return -1
cursor.close()
g_mysql.close()
# Result check
# 쿼리 실행은 됐지만, 쿼리 결과가 없는 경우
if not rows:
print("[InitTable] Select Server Error: result is empty")
return -1
# 쿼리 결과를 받아서, 각 레코드를 순회하며 dbHost[] 배열에 저장
# 이 배열은 이후 ThreadProcessList 함수에서 수집 루프에 사용됨
dbHost.clear()
for row in rows:
dbHost.append(
dbHost_struct(
host=row[0],
port=int(row[1]),
ip=row[2],
service=row[3]
)
)
g_nServerCnt = len(dbHost)
return 0
# 핵심 수집 실행 로직
# 각 스레드별로 수집 대상 서버 목록을 InitTable 함수에서 받아와서, 수집 작업을 병렬 처리한 뒤 결과를 LogDB에 기록
# InitTable 함수 1번 호출, ThreadProcessList 함수 반복 호출됨
def ThreadMain(dummy_arg=None):
global g_IntervalServer, g_nServerCnt, dbHost
global g_strCurrTime, g_strCurrDate, g_nCount, g_fCheckLog
# 현재 시간 문자열 준비
now = datetime.datetime.now()
g_strCurrTime = now.strftime("%Y%m%d%H%M%S")
g_strCurrDate = now.strftime("%Y%m%d")
# 로그 파일명 생성
strFileNameLog = f"./log/_myproc_{strUserID}.log.{now.strftime('%Y%m%d')}"
# 스크립트 실행 디렉토리 안에 log 디렉토리가 없으면 생성
try:
g_fCheckLog = open(strFileNameLog, "a")
except FileNotFoundError:
# DEBUG=1일 때만 print
if DEBUG:
print("Log directory not found. Creating log directory...")
else:
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, Log directory not found. Creating log directory...\n")
try:
import os
os.makedirs("./log", exist_ok=True)
g_fCheckLog = open(strFileNameLog, "a")
except Exception as e:
if DEBUG:
print(f"Cannot create log directory Error! {e}")
else:
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, Cannot create log directory Error!!\n")
exit(-1)
except Exception as e:
if DEBUG:
print(f"Cannot create log file Error! {e}")
else:
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, Cannot create log file Error! {e}\n")
exit(-1)
# 서버 목록 초기화
# 사용자 ID에 해당하는 서버 목록을 DB에서 불러옴 - dbHost[], g_nServerCnt
# 이 부분 실패 시 에러 로그를 파일에 남기기 위해, C 코드와는 실행 순서를 달리 함 -> segmentation fault(segfault) 방지
if InitTable() == -1:
if DEBUG:
print("### MyMon ServerDB tb_serverinfo Table Error!! ###")
else:
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, MyMon ServerDB tb_serverinfo Table Error!!\n")
g_fCheckLog.close()
exit(-1)
# 스레드 수 결정
if g_nServerCnt > THREADCOUNT:
nThreadCount = THREADCOUNT
else:
nThreadCount = 1
nIntervalCount = g_nServerCnt // nThreadCount
# 스레드별 할당 범위 설정
g_IntervalServer = [Interval_Server(0, 0) for _ in range(nThreadCount)]
for i in range(nThreadCount):
start_idx = i * nIntervalCount
end_idx = start_idx + nIntervalCount - 1
if i == nThreadCount - 1: # 마지막 스레드는 나머지 포함
end_idx = g_nServerCnt - 1
g_IntervalServer[i] = Interval_Server(nStart=start_idx, nEnd=end_idx)
# 실행 시간 측정 시작
start_time = time.time()
threads = []
for i in range(nThreadCount):
t = threading.Thread(target=ThreadProcessList, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 실행 시간 측정 종료
end_time = time.time()
dElapsedTime = end_time - start_time
per_server = dElapsedTime / g_nServerCnt if g_nServerCnt > 0 else 0
# 로그 기록
log_line = f"\n{now.strftime('%Y-%m-%d %H:%M:%S')}, Server:{g_nServerCnt}, Elapsed:{dElapsedTime:.3f}, perServer:{per_server:.3f}\n"
if DEBUG:
print(log_line.strip())
else:
g_fCheckLog.write(log_line)
g_fCheckLog.close()
# 스레드별로 각자 맡은 대상 서버에 접속해 정보 수집, 로그DB에 기록
def ThreadProcessList(nOrder: int):
global dbHost, g_IntervalServer, g_strCurrTime, g_strCurrDate, g_nCount
# Log DB 연결
try:
log_db = pymysql.connect(
host=MYSQL_LOG_HOST,
user=MYSQL_LOG_USER,
password=MYSQL_LOG_PASSWD,
database=MYSQL_LOG_DB,
port=MYSQL_LOG_PORT,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
if DEBUG:
print(f"Failed to connect to log database: Error: {e}")
else:
g_fCheckLog.write(f"{g_strCurrTime}, Failed to connect to meta database: Error: {e}\n")
time.sleep(1)
return
# 테이블명 생성 (YYYYMMDD_HH 형식)
# 테이블 생성은 INSERT 직전에 수행
table_date_time = g_strCurrTime[:8] # YYYYMMDD
table_hour = g_strCurrTime[8:10] # HH
processlist_table = f"processlist_{table_date_time}_{table_hour}"
innodb_table = f"innodb_{table_date_time}_{table_hour}"
innodb_lock_table = f"innodb_lock_{table_date_time}_{table_hour}"
# 서버 범위 가져오기
nStart = g_IntervalServer[nOrder].nStart
nEnd = g_IntervalServer[nOrder].nEnd
for i in range(nStart, nEnd + 1):
host = dbHost[i].host
port = dbHost[i].port
ip = dbHost[i].ip
service = dbHost[i].service
# 대상 서버 접속 시도
try:
svc_db = pymysql.connect(
host=ip,
user=MYSQL_USER,
password=MYSQL_PASSWD,
database='mysql',
port=port,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
if port != 0:
if DEBUG:
print(f" > [{host}:{port}], Failed to connect to database: Error: {e}")
else:
g_fCheckLog.write(f"{g_strCurrTime}, > [{host}], Failed to connect to database: Error: {e}\n")
continue
# 특정 서버 제외 (ifsr-dev*)
if host.startswith("ifsr-dev"):
svc_db.close()
continue
try:
# 쿼리 실행: processlist, innodb status, version, innodb lock
# ================================
# SHOW FULL PROCESSLIST
# ================================
cursor = None
try:
cursor = svc_db.cursor() # 대상 서버 커서(쿼리 결과를 순회하는 객체) 생성
except Exception as e:
if DEBUG:
print(f" * processList cursor create failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
svc_db.close()
continue
try:
cursor.execute("SHOW FULL PROCESSLIST") # C 코드에선 strProcess 문자열을 사용했으나, 여기선 바로 쿼리문을 넣음. 가독성
result = cursor.fetchall()
except Exception as e:
if DEBUG:
print(f" * processList query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
svc_db.close()
continue # goto QUERYERROR;과 같은 효과
if not result:
if DEBUG:
print(f" * processList mysql_num_rows is Zero: {host} {port}")
cursor.close()
svc_db.close()
continue
for row in result:
# row mapping (SHOW FULL PROCESSLIST)
# 0: Id, 1: User, 2: Host, 3: db, 4: Command, 5: Time, 6: State, 7: Info
# ----------------
# Filter conditions (C 코드 그대로)
# ----------------
if (
(row[1] == "system user" and (row[6] or "").startswith("Waiting for master")) or
row[1] == "event_scheduler" or
(row[4] == "Sleep" and int(row[5] or 0) < 20000) or
row[4] == "Binlog Dump"
):
continue
# ----------------
# 값 매핑
# ----------------
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
str_Pid = str(row[0] or "0")
str_user = row[1] or "NULL"
# ip:port → ip 분리
str_ip = row[2] or "NULL"
if ":" in str_ip:
str_ip = str_ip.split(":")[0]
str_db = row[3] or "NULL"
str_command = row[4] or "NULL"
str_ExecTime = str(row[5] or "0")
str_state = row[6] or "NULL"
str_query = row[7] or "NULL"
nCount = g_nCount
# ----------------
# Warning log 출력 (C 코드 조건 그대로)
# ----------------
if (
(str_state.startswith("Locked") and int(str_ExecTime) > 5) or
str_state.startswith("Killed") or
(str_state.startswith("Sending") and int(str_ExecTime) > 5) or
(str_state.startswith("Waiting") and int(str_ExecTime) > 5) or
(str_command.startswith("Query") and int(str_ExecTime) > 5) or
(str_state.startswith("Sorting") and int(str_ExecTime) > 5)
):
warn_msg = (
f"\n {str_host}:{nPort} [{service}] - "
f"State: {str_state}, Time: {str_ExecTime}, "
f"User: {str_user}@{str_ip}\n"
f" DataBase: {str_db}\n"
f" Query : {str_query}\n"
)
if DEBUG:
print(warn_msg, end="")
else:
g_fCheckLog.write(warn_msg)
# ----------------
# INSERT INTO processlist
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
if DEBUG:
print(f" * processlist insert cursor error: {e}")
continue
try:
# processlist 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_processlist = f"""
CREATE TABLE IF NOT EXISTS `{processlist_table}` (
`logtime` varchar(15) NOT NULL default '',
`logdate` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`pid` varchar(14) NOT NULL default '',
`user` varchar(20) default NULL,
`ip` varchar(21) NOT NULL default '',
`db` varchar(20) default NULL,
`command` varchar(30) default NULL,
`execTime` varchar(14) default NULL,
`state` varchar(30) default NULL,
`Query` blob,
`nOrder` int(11) NOT NULL default '0',
PRIMARY KEY (`logtime`,`host`,`port`,`ip`,`pid`,`nOrder`),
KEY `idx1` (`logdate`,`host`,`port`,`ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_processlist)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{processlist_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,COMPRESS(%s),%s)
""",
(
str_logtime,
str_logdate,
str_host,
nPort,
str_Pid,
str_user,
str_ip,
str_db,
str_command,
str_ExecTime,
str_state,
str_query,
nCount,
),
)
log_db.commit()
except Exception as e:
if DEBUG:
print(f" * processlist insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW ENGINE INNODB STATUS
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SHOW ENGINE INNODB STATUS")
result = cursor.fetchall()
except Exception as e:
if DEBUG:
print(f" * InnoDB Status query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
if DEBUG:
print(f" * InnoDB Status result is empty: {host} {port}")
cursor.close()
continue
for row in result:
# ----------------
# 버전별 InnoDB Status 위치 분기
# ----------------
if row[0] and row[0].startswith("InnoDB"):
# MySQL 5.1+
str_status = row[2] if row[2] else "NULL"
nFlag = 1
else:
# MySQL 5.0
str_status = row[0] if row[0] else "NULL"
nFlag = 0
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
# ----------------
# INSERT INTO innodb
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
if DEBUG:
print(f" * innodb insert cursor error: {e}")
cursor.close()
continue
try:
# innodb 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb = f"""
CREATE TABLE IF NOT EXISTS `{innodb_table}` (
`logtime` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`nOrder` int(11) NOT NULL default '0',
`Status` blob,
PRIMARY KEY (`logtime`,`host`,`port`,`nOrder`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{innodb_table}`
VALUES (%s, %s, %s, %s, COMPRESS(%s))
""",
(
str_logtime,
str_host,
nPort,
nCount,
str_status,
),
)
log_db.commit()
except Exception as e:
if DEBUG:
print(f" * innodb insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW MYSQL VERSION
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
except Exception as e:
if DEBUG:
print(f" * VERSION query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
if DEBUG:
print(f" * VERSION result is empty: {host} {port}")
cursor.close()
continue
str_version = result[0]
cursor.close()
# ----------------
# MySQL 버전 파싱
# ----------------
# 예: "5.7.35-log" or "8.0.32"
if str_version.startswith("5.7") or str_version.startswith("5.6"):
str_lock_query = """
SELECT r.trx_mysql_thread_id as waiting_thread,
r.trx_id as waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) as waiting_time,
r.trx_query as waiting_query,
b.trx_mysql_thread_id as blocking_thread,
b.trx_id as blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) as blocking_time,
b.trx_query as blocking_query
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id
"""
elif str_version.startswith("8.0"):
str_lock_query = """
SELECT r.trx_mysql_thread_id AS waiting_thread,
r.trx_id AS waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) AS waiting_time,
r.trx_query AS waiting_query,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_id AS blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) AS blocking_time,
b.trx_query AS blocking_query
FROM performance_schema.data_lock_waits dw
JOIN information_schema.innodb_trx r ON r.trx_id = dw.REQUESTING_ENGINE_TRANSACTION_ID
JOIN information_schema.innodb_trx b ON b.trx_id = dw.BLOCKING_ENGINE_TRANSACTION_ID
"""
else:
if DEBUG:
print(f" * MySQL Version [{str_version}] not supported for LOCK analysis: {host} {port}")
continue
# ================================
# SHOW INNODB LOCK
# ================================
if nFlag == 0: # MySQL 5.0 이하는 LOCK 분석 스킵
continue
try:
cursor = svc_db.cursor()
cursor.execute(str_lock_query)
result = cursor.fetchall()
except Exception as e:
if DEBUG:
print(f" * InnoDB Lock query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
if DEBUG:
print(f" * InnoDB Lock result is empty: {host} {port}")
cursor.close()
continue
for row in result:
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
try:
log_cursor = log_db.cursor()
except Exception as e:
if DEBUG:
print(f" * innodb_lock insert cursor error: {e}")
continue
try:
# innodb_lock 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb_lock = f"""
CREATE TABLE IF NOT EXISTS `{innodb_lock_table}` (
`logtime` varchar(15) NOT NULL DEFAULT '',
`logdate` varchar(15) NOT NULL DEFAULT '',
`host` varchar(20) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`waiting_trx_id` varchar(18),
`waiting_thread_id` bigint(21) unsigned,
`waiting_time` int(11),
`waiting_query` varchar(1024),
`blocking_trx_id` varchar(18),
`blocking_thread_id` bigint(21) unsigned,
`blocking_time` int(11),
`blocking_query` varchar(1024),
KEY( host, port, logdate, logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb_lock)
cursor_create.close()
waiting_thread = int(row[0]) if row[0] else 0
waiting_trx_id = row[1] or "0"
waiting_time = int(row[2]) if row[2] else 0
waiting_query = row[3] or "0"
blocking_thread = int(row[4]) if row[4] else 0
blocking_trx_id = row[5] or "0"
blocking_time = int(row[6]) if row[6] else 0
blocking_query = row[7] or "0"
log_cursor.execute(
f'''
INSERT INTO `{innodb_lock_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
''',
(
str_logtime,
str_logdate,
str_host,
nPort,
waiting_trx_id,
waiting_thread,
waiting_time,
waiting_query,
blocking_trx_id,
blocking_thread,
blocking_time,
blocking_query,
)
)
log_db.commit()
except Exception as e:
if DEBUG:
print(f" * innodb_lock insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close() # log_db 커서 종료
cursor.close() # 대상 서버 커서 종료
finally: # 예외 발생 여부와 상관없이 항상 접속 종료
svc_db.close() # 대상 서버 접속 종료
log_db.close() # Log DB 접속 종료
# 시그널 핸들러: SIGALRM 수신 시 ThreadMain 호출
def signal_handler(signum, frame):
global g_nCount
ThreadMain(1)
g_nCount += 1
# 프로그램 진입점. signal handler 설정, 주기적 호출 준비
# ThreadMain 함수 호출
# 실행 명령어 python process2.py admin01 실행 시, argv[1]은 "admin01"
def main():
global strUserID, g_nCount
if len(sys.argv) != 2:
print("Usage: python process2.py <admin_id>")
sys.exit(1)
strUserID = sys.argv[1]
# 최초 1회 수집 수행
ThreadMain(1)
# 디버그 모드: 무한히 직접 호출
if DEBUG:
while True:
ThreadMain(1)
g_nCount += 1
time.sleep(INTERVALTIME_SEC) # C 코드에선 sleep 없이 바로 호출했으나, 여기선 약간의 지연 추가
return
# 시그널 핸들러 등록
signal.signal(signal.SIGALRM, signal_handler)
while True:
# INTERVALTIME_SEC 초 후 SIGALRM 발생 예약
signal.alarm(INTERVALTIME_SEC)
signal.pause() # SIGALRM 대기
if __name__ == "__main__":
main()
실행 테스트(정상 동작)

debug모드 제거 버전(코드 내에서 DEBUG 변수 TRUE, FALSE로만 제어 가능)
# process2.py
import sys
import signal
import pymysql
import threading
import time
import datetime
import logging
from dataclasses import dataclass
from typing import List
import configparser
# ---------- config.ini 불러오기 ----------
config = configparser.ConfigParser()
config.read("config.ini")
# MyMON 설정
# 모니터링 대상 서버 목록이 저장된 일종의 메타 DB
MYMON_HOST = config["MYMON"]["host"]
MYMON_PORT = int(config["MYMON"]["port"])
MYMON_USER = config["MYMON"]["user"]
MYMON_PASSWD = config["MYMON"]["password"]
MYMON_DB = config["MYMON"]["database"]
# Log DB 설정
# 수집된 정보를 기록하는 DB
MYSQL_LOG_HOST = config["LOGDB"]["host"]
MYSQL_LOG_PORT = int(config["LOGDB"]["port"])
MYSQL_LOG_USER = config["LOGDB"]["user"]
MYSQL_LOG_PASSWD = config["LOGDB"]["password"]
MYSQL_LOG_DB = config["LOGDB"]["database"]
# 수집 대상 서버 접속 정보
# ThreadProcessList() 내부에서 각 dbHost[i]에 접속할 때 사용됨. dbHost[i]는 InitTable()에서 선언된 DB 호스트 정보 배열
MYSQL_USER = config["SERVICE"]["user"]
MYSQL_PASSWD = config["SERVICE"]["password"]
# 기타 일반 설정
INTERVALTIME_SEC = int(config["GENERAL"]["interval"]) # 수집 주기 (초)
THREADCOUNT = int(config["GENERAL"]["threads"]) # 최대 스레드 개수
g_nTimeout = int(config["GENERAL"]["timeout"]) # MySQL 접속 타임아웃 (초)
HOSTNAME_LEN = 25
IP_LEN = 15
LOGFILE_LEN = 18
# ---------- 디버그 모드 ----------
DEBUG = False # 필요 시 True로 설정
# ---------- 구조체 ----------
@dataclass
class dbHost_struct:
host: str
port: int
ip: str
service: str
@dataclass
class Interval_Server:
nStart: int
nEnd: int
# ---------- 전역 변수 ----------
dbHost: List[dbHost_struct] = [] # 감시할 서버 정보 리스트
g_IntervalServer: List[Interval_Server] = [] # 스레드별 서버 인덱스 범위
g_nServerCnt = 0 # 감시할 서버 개수
g_strCurrTime = "" # 수집 시간 (로그 timestamp)
g_strCurrDate = ""
g_strPrevTime = ""
g_nCount = 0 # 수집 횟수
strUserID = "" # 사용자 ID (argv[1]에 해당)
g_fCheckLog = None # 로그 파일 핸들
# ---------- 로그 설정 ----------
logging.basicConfig(
filename='monitor.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
# ---------- 시간 문자열 생성 ----------
def get_time_strings():
now = datetime.datetime.now()
g_strCurrTime = now.strftime('%Y%m%d%H%M%S')
g_strCurrDate = now.strftime('%Y%m%d')
return g_strCurrTime, g_strCurrDate
# 감시(모니터링)할 서버 목록을 MyMon DB에서 조회하여 dbHost 배열에 저장
# ThreadMain 함수 첫부분에 한번만 등장할 예정
def InitTable() -> int:
global dbHost, g_nServerCnt, strUserID, g_mysql
strQuery = (
"SELECT host, "
" port, "
" SUBSTRING_INDEX(ip, '|', -1), "
" service_detail "
" FROM MYMON.serverinfo B, MYMON.admin_user C "
" WHERE B.status = 1 "
" AND B.db_charged = CONVERT(C.admin_name USING euckr) "
" AND MONITORING <> '' "
" AND repl_state != 'MM' "
f" AND C.kerberos_id = '{strUserID}' "
" ORDER BY 1"
)
# MyMON DB connect
try:
g_mysql = pymysql.connect(
host=MYMON_HOST,
user=MYMON_USER,
password=MYMON_PASSWD,
database=MYMON_DB,
port=MYMON_PORT,
charset='utf8',
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
print(f"[InitTable] Failed to connect to database: Error: {e}")
return -1
# Cursor create
# 커서: 쿼리 결과를 순회하고 조작하는 객체
try:
cursor = g_mysql.cursor()
except Exception as e:
print(f"[InitTable] Failed to create cursor: Error: {e}")
g_mysql.close()
return -1
# SET NAMES
# UTF-8 문자셋 설정
try:
cursor.execute("SET NAMES utf8")
except Exception as e:
print(f"[InitTable] Failed to set charset: Error: {e}")
cursor.close()
g_mysql.close()
return -1
# Execute query
try:
cursor.execute(strQuery)
rows = cursor.fetchall()
except Exception as e:
print(f"[InitTable] Failed to query database item: Error: {e}")
cursor.close()
g_mysql.close()
return -1
cursor.close()
g_mysql.close()
# Result check
# 쿼리 실행은 됐지만, 쿼리 결과가 없는 경우
if not rows:
print("[InitTable] Select Server Error: result is empty")
return -1
# 쿼리 결과를 받아서, 각 레코드를 순회하며 dbHost[] 배열에 저장
# 이 배열은 이후 ThreadProcessList 함수에서 수집 루프에 사용됨
dbHost.clear()
for row in rows:
dbHost.append(
dbHost_struct(
host=row[0],
port=int(row[1]),
ip=row[2],
service=row[3]
)
)
g_nServerCnt = len(dbHost)
return 0
# 핵심 수집 실행 로직
# 각 스레드별로 수집 대상 서버 목록을 InitTable 함수에서 받아와서, 수집 작업을 병렬 처리한 뒤 결과를 LogDB에 기록
# InitTable 함수 1번 호출, ThreadProcessList 함수 반복 호출됨
def ThreadMain(dummy_arg=None):
global g_IntervalServer, g_nServerCnt, dbHost
global g_strCurrTime, g_strCurrDate, g_nCount, g_fCheckLog
# 현재 시간 문자열 준비
now = datetime.datetime.now()
g_strCurrTime = now.strftime("%Y%m%d%H%M%S")
g_strCurrDate = now.strftime("%Y%m%d")
# 로그 파일명 생성
strFileNameLog = f"./log/_myproc_{strUserID}.log.{now.strftime('%Y%m%d')}"
# 스크립트 실행 디렉토리 안에 log 디렉토리가 없으면 생성
try:
g_fCheckLog = open(strFileNameLog, "a")
except FileNotFoundError:
print(f"Log directory not found. Creating log directory...")
try:
import os
os.makedirs("./log", exist_ok=True)
g_fCheckLog = open(strFileNameLog, "a")
except Exception as e:
print(f"Cannot create log directory Error! {e}")
exit(-1)
except Exception as e:
print(f"Cannot create log file Error! {e}")
exit(-1)
# 서버 목록 초기화
# 사용자 ID에 해당하는 서버 목록을 DB에서 불러옴 - dbHost[], g_nServerCnt
# 이 부분 실패 시 에러 로그를 파일에 남기기 위해, C 코드와는 실행 순서를 달리 함 -> segmentation fault(segfault) 방지
if InitTable() == -1:
print("### MyMon ServerDB tb_serverinfo Table Error!! ###")
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, MyMon ServerDB tb_serverinfo Table Error!!\n")
g_fCheckLog.close()
exit(-1)
# 스레드 수 결정
if g_nServerCnt > THREADCOUNT:
nThreadCount = THREADCOUNT
else:
nThreadCount = 1
nIntervalCount = g_nServerCnt // nThreadCount
# 스레드별 할당 범위 설정
g_IntervalServer = [Interval_Server(0, 0) for _ in range(nThreadCount)]
for i in range(nThreadCount):
start_idx = i * nIntervalCount
end_idx = start_idx + nIntervalCount - 1
if i == nThreadCount - 1: # 마지막 스레드는 나머지 포함
end_idx = g_nServerCnt - 1
g_IntervalServer[i] = Interval_Server(nStart=start_idx, nEnd=end_idx)
# 실행 시간 측정 시작
start_time = time.time()
threads = []
for i in range(nThreadCount):
t = threading.Thread(target=ThreadProcessList, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 실행 시간 측정 종료
end_time = time.time()
dElapsedTime = end_time - start_time
per_server = dElapsedTime / g_nServerCnt if g_nServerCnt > 0 else 0
# 로그 기록
log_line = f"\n{now.strftime('%Y-%m-%d %H:%M:%S')}, Server:{g_nServerCnt}, Elapsed:{dElapsedTime:.3f}, perServer:{per_server:.3f}\n"
print(log_line.strip())
g_fCheckLog.write(log_line)
g_fCheckLog.close()
# 스레드별로 각자 맡은 대상 서버에 접속해 정보 수집, 로그DB에 기록
def ThreadProcessList(nOrder: int):
global dbHost, g_IntervalServer, g_strCurrTime, g_strCurrDate, g_nCount
# Log DB 연결
try:
log_db = pymysql.connect(
host=MYSQL_LOG_HOST,
user=MYSQL_LOG_USER,
password=MYSQL_LOG_PASSWD,
database=MYSQL_LOG_DB,
port=MYSQL_LOG_PORT,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
print(f"Failed to connect to meta database: Error: {e}")
time.sleep(1)
return
# 테이블명 생성 (YYYYMMDD_HH 형식)
# 테이블 생성은 INSERT 직전에 수행
table_date_time = g_strCurrTime[:8] # YYYYMMDD
table_hour = g_strCurrTime[8:10] # HH
processlist_table = f"processlist_{table_date_time}_{table_hour}"
innodb_table = f"innodb_{table_date_time}_{table_hour}"
innodb_lock_table = f"innodb_lock_{table_date_time}_{table_hour}"
# 서버 범위 가져오기
nStart = g_IntervalServer[nOrder].nStart
nEnd = g_IntervalServer[nOrder].nEnd
for i in range(nStart, nEnd + 1):
host = dbHost[i].host
port = dbHost[i].port
ip = dbHost[i].ip
service = dbHost[i].service
# 대상 서버 접속 시도
try:
svc_db = pymysql.connect(
host=ip,
user=MYSQL_USER,
password=MYSQL_PASSWD,
database='mysql',
port=port,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
if port != 0:
print(f" > [{host}:{port}], Failed to connect to database: Error: {e}")
continue
# 특정 서버 제외 (ifsr-dev*)
if host.startswith("ifsr-dev"):
svc_db.close()
continue
try:
# 쿼리 실행: processlist, innodb status, version, innodb lock
# ================================
# SHOW FULL PROCESSLIST
# ================================
cursor = None
try:
cursor = svc_db.cursor() # 대상 서버 커서(쿼리 결과를 순회하는 객체) 생성
except Exception as e:
print(f" * processList cursor create failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
svc_db.close()
continue
try:
cursor.execute("SHOW FULL PROCESSLIST") # C 코드에선 strProcess 문자열을 사용했으나, 여기선 바로 쿼리문을 넣음. 가독성
result = cursor.fetchall()
except Exception as e:
print(f" * Query processList Result result is null: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
svc_db.close()
continue # goto QUERYERROR;과 같은 효과
if not result:
print(f" * processList mysql_num_rows is Zero: {host} {port}")
cursor.close()
svc_db.close()
continue
for row in result:
# row mapping (SHOW FULL PROCESSLIST)
# 0: Id, 1: User, 2: Host, 3: db, 4: Command, 5: Time, 6: State, 7: Info
# ----------------
# Filter conditions (C 코드 그대로)
# ----------------
if (
(row[1] == "system user" and (row[6] or "").startswith("Waiting for master")) or
row[1] == "event_scheduler" or
(row[4] == "Sleep" and int(row[5] or 0) < 20000) or
row[4] == "Binlog Dump"
):
continue
# ----------------
# 값 매핑
# ----------------
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
str_Pid = str(row[0] or "0")
str_user = row[1] or "NULL"
# ip:port → ip 분리
str_ip = row[2] or "NULL"
if ":" in str_ip:
str_ip = str_ip.split(":")[0]
str_db = row[3] or "NULL"
str_command = row[4] or "NULL"
str_ExecTime = str(row[5] or "0")
str_state = row[6] or "NULL"
str_query = row[7] or "NULL"
nCount = g_nCount
# ----------------
# Warning log 출력 (C 코드 조건 그대로)
# ----------------
if (
(str_state.startswith("Locked") and int(str_ExecTime) > 5) or
str_state.startswith("Killed") or
(str_state.startswith("Sending") and int(str_ExecTime) > 5) or
(str_state.startswith("Waiting") and int(str_ExecTime) > 5) or
(str_command.startswith("Query") and int(str_ExecTime) > 5) or
(str_state.startswith("Sorting") and int(str_ExecTime) > 5)
):
warn_msg = (
f"\n {str_host}:{nPort} [{service}] - "
f"State: {str_state}, Time: {str_ExecTime}, "
f"User: {str_user}@{str_ip}\n"
f" DataBase: {str_db}\n"
f" Query : {str_query}\n"
)
print(warn_msg, end="")
if g_fCheckLog:
g_fCheckLog.write(warn_msg)
# ----------------
# INSERT INTO processlist
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * processlist insert cursor error: {e}")
continue
try:
# processlist 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_processlist = f"""
CREATE TABLE IF NOT EXISTS `{processlist_table}` (
`logtime` varchar(15) NOT NULL default '',
`logdate` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`pid` varchar(14) NOT NULL default '',
`user` varchar(20) default NULL,
`ip` varchar(21) NOT NULL default '',
`db` varchar(20) default NULL,
`command` varchar(30) default NULL,
`execTime` varchar(14) default NULL,
`state` varchar(30) default NULL,
`Query` blob,
`nOrder` int(11) NOT NULL default '0',
PRIMARY KEY (`logtime`,`host`,`port`,`ip`,`pid`,`nOrder`),
KEY `idx1` (`logdate`,`host`,`port`,`ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_processlist)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{processlist_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,COMPRESS(%s),%s)
""",
(
str_logtime,
str_logdate,
str_host,
nPort,
str_Pid,
str_user,
str_ip,
str_db,
str_command,
str_ExecTime,
str_state,
str_query,
nCount,
),
)
log_db.commit()
except Exception as e:
print(f" * processlist insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW ENGINE INNODB STATUS
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SHOW ENGINE INNODB STATUS")
result = cursor.fetchall()
except Exception as e:
print(f" * InnoDB Status query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * InnoDB Status result is empty: {host} {port}")
cursor.close()
continue
for row in result:
# ----------------
# 버전별 InnoDB Status 위치 분기
# ----------------
if row[0] and row[0].startswith("InnoDB"):
# MySQL 5.1+
str_status = row[2] if row[2] else "NULL"
nFlag = 1
else:
# MySQL 5.0
str_status = row[0] if row[0] else "NULL"
nFlag = 0
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
# ----------------
# INSERT INTO innodb
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * innodb insert cursor error: {e}")
cursor.close()
continue
try:
# innodb 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb = f"""
CREATE TABLE IF NOT EXISTS `{innodb_table}` (
`logtime` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`nOrder` int(11) NOT NULL default '0',
`Status` blob,
PRIMARY KEY (`logtime`,`host`,`port`,`nOrder`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{innodb_table}`
VALUES (%s, %s, %s, %s, COMPRESS(%s))
""",
(
str_logtime,
str_host,
nPort,
nCount,
str_status,
),
)
log_db.commit()
except Exception as e:
print(f" * innodb insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW MYSQL VERSION
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
except Exception as e:
print(f" * VERSION query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * VERSION result is empty: {host} {port}")
cursor.close()
continue
str_version = result[0]
cursor.close()
# ----------------
# MySQL 버전 파싱
# ----------------
# 예: "5.7.35-log" or "8.0.32"
if str_version.startswith("5.7") or str_version.startswith("5.6"):
str_lock_query = """
SELECT r.trx_mysql_thread_id as waiting_thread,
r.trx_id as waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) as waiting_time,
r.trx_query as waiting_query,
b.trx_mysql_thread_id as blocking_thread,
b.trx_id as blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) as blocking_time,
b.trx_query as blocking_query
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id
"""
elif str_version.startswith("8.0"):
str_lock_query = """
SELECT r.trx_mysql_thread_id AS waiting_thread,
r.trx_id AS waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) AS waiting_time,
r.trx_query AS waiting_query,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_id AS blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) AS blocking_time,
b.trx_query AS blocking_query
FROM performance_schema.data_lock_waits dw
JOIN information_schema.innodb_trx r ON r.trx_id = dw.REQUESTING_ENGINE_TRANSACTION_ID
JOIN information_schema.innodb_trx b ON b.trx_id = dw.BLOCKING_ENGINE_TRANSACTION_ID
"""
else:
print(f" * MySQL Version [{str_version}] not supported for LOCK analysis: {host} {port}")
continue
# ================================
# SHOW INNODB LOCK
# ================================
if nFlag == 0: # MySQL 5.0 이하는 LOCK 분석 스킵
continue
try:
cursor = svc_db.cursor()
cursor.execute(str_lock_query)
result = cursor.fetchall()
except Exception as e:
print(f" * InnoDB Lock query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * InnoDB Lock result is empty: {host} {port}")
cursor.close()
continue
for row in result:
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * innodb_lock insert cursor error: {e}")
continue
try:
# innodb_lock 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb_lock = f"""
CREATE TABLE IF NOT EXISTS `{innodb_lock_table}` (
`logtime` varchar(15) NOT NULL DEFAULT '',
`logdate` varchar(15) NOT NULL DEFAULT '',
`host` varchar(20) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`waiting_trx_id` varchar(18),
`waiting_thread_id` bigint(21) unsigned,
`waiting_time` int(11),
`waiting_query` varchar(1024),
`blocking_trx_id` varchar(18),
`blocking_thread_id` bigint(21) unsigned,
`blocking_time` int(11),
`blocking_query` varchar(1024),
KEY( host, port, logdate, logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb_lock)
cursor_create.close()
waiting_thread = int(row[0]) if row[0] else 0
waiting_trx_id = row[1] or "0"
waiting_time = int(row[2]) if row[2] else 0
waiting_query = row[3] or "0"
blocking_thread = int(row[4]) if row[4] else 0
blocking_trx_id = row[5] or "0"
blocking_time = int(row[6]) if row[6] else 0
blocking_query = row[7] or "0"
log_cursor.execute(
f'''
INSERT INTO `{innodb_lock_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
''',
(
str_logtime,
str_logdate,
str_host,
nPort,
waiting_trx_id,
waiting_thread,
waiting_time,
waiting_query,
blocking_trx_id,
blocking_thread,
blocking_time,
blocking_query,
)
)
log_db.commit()
except Exception as e:
print(f" * innodb_lock insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close() # log_db 커서 종료
cursor.close() # 대상 서버 커서 종료
finally: # 예외 발생 여부와 상관없이 항상 접속 종료
svc_db.close() # 대상 서버 접속 종료
log_db.close() # Log DB 접속 종료
# 시그널 핸들러: SIGALRM 수신 시 ThreadMain 호출
def signal_handler(signum, frame):
global g_nCount
ThreadMain(1)
g_nCount += 1
# 프로그램 진입점. signal handler 설정, 주기적 호출 준비
# ThreadMain 함수 호출
# 실행 명령어 python process2.py admin01 실행 시, argv[1]은 "admin01"
def main():
global strUserID, g_nCount
if len(sys.argv) != 2:
print("Usage: python process2.py <admin_id>")
sys.exit(1)
strUserID = sys.argv[1]
# 최초 1회 수집 수행
ThreadMain(1)
# 디버그 모드: 무한히 직접 호출
if DEBUG:
while True:
ThreadMain(1)
g_nCount += 1
time.sleep(INTERVALTIME_SEC) # C 코드에선 sleep 없이 바로 호출했으나, 여기선 약간의 지연 추가
return
# 시그널 핸들러 등록
signal.signal(signal.SIGALRM, signal_handler)
while True:
# INTERVALTIME_SEC 초 후 SIGALRM 발생 예약
signal.alarm(INTERVALTIME_SEC)
signal.pause() # SIGALRM 대기
if __name__ == "__main__":
main()
기존 코드
# process2.py
import sys
import signal
import pymysql
import threading
import time
import datetime
import logging
from dataclasses import dataclass
from typing import List
# ---------- 설정값 ----------
# MyMON DB 접속 정보
# 모니터링 대상 서버 목록이 저장된 일종의 메타 DB
MYMON_HOST = "" # 유출 안되게 주의
MYMON_USER = "admin_2026"
MYMON_PASSWD = ""
MYMON_DB = "MYMON"
MYMON_PORT = 13306
# 로그 DB 접속 정보
# 수집된 정보를 기록하는 DB
MYSQL_LOG_HOST = ""
MYSQL_LOG_USER = "admin_2026"
MYSQL_LOG_PASSWD = ""
MYSQL_LOG_DB = "log"
MYSQL_LOG_PORT = 13306
# ServiceDB(수집 대상 서버) 접속 정보
# ThreadProcessList() 내부에서 각 dbHost[i]에 접속할 때 사용됨. dbHost[i]는 InitTable()에서 선언된 DB 호스트 정보 배열
MYSQL_USER = "admin"
MYSQL_PASSWD = ""
MYSQLHOST_NUM = 2000 # 감시할 서버 최대 개수
INTERVALTIME_SEC = 1 # 수집 주기 (1초)
THREADCOUNT = 15 # 최대 스레드 개수
HOSTNAME_LEN = 25
IP_LEN = 15
LOGFILE_LEN = 18
# ---------- 디버그 모드 ----------
DEBUG = False # 필요 시 True로 설정
# ---------- 구조체 ----------
@dataclass
class dbHost_struct:
host: str
port: int
ip: str
service: str
@dataclass
class Interval_Server:
nStart: int
nEnd: int
# ---------- 전역 변수 ----------
dbHost: List[dbHost_struct] = [] # 감시할 서버 정보 리스트
g_IntervalServer: List[Interval_Server] = [] # 스레드별 서버 인덱스 범위
g_nServerCnt = 0 # 감시할 서버 개수
g_strCurrTime = "" # 수집 시간 (로그 timestamp)
g_strCurrDate = ""
g_strPrevTime = ""
g_nCount = 0 # 수집 횟수
strUserID = "" # 사용자 ID (argv[1]에 해당)
g_fCheckLog = None # 로그 파일 핸들
g_nTimeout = 2 # MySQL 접속 타임아웃 (초)
# ---------- 로그 설정 ----------
logging.basicConfig(
filename='monitor.log',
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s'
)
# ---------- 시간 문자열 생성 ----------
def get_time_strings():
now = datetime.datetime.now()
g_strCurrTime = now.strftime('%Y%m%d%H%M%S')
g_strCurrDate = now.strftime('%Y%m%d')
return g_strCurrTime, g_strCurrDate
# 감시(모니터링)할 서버 목록을 MyMon DB에서 조회하여 dbHost 배열에 저장
# ThreadMain 함수 첫부분에 한번만 등장할 예정
def InitTable() -> int:
global dbHost, g_nServerCnt, strUserID, g_mysql
strQuery = (
"SELECT host, "
" port, "
" SUBSTRING_INDEX(ip, '|', -1), "
" service_detail "
" FROM MYMON.serverinfo B, MYMON.admin_user C "
" WHERE B.status = 1 "
" AND B.db_charged = CONVERT(C.admin_name USING euckr) "
" AND MONITORING <> '' "
" AND repl_state != 'MM' "
f" AND C.kerberos_id = '{strUserID}' "
" ORDER BY 1"
)
# MyMON DB connect
try:
g_mysql = pymysql.connect(
host=MYMON_HOST,
user=MYMON_USER,
password=MYMON_PASSWD,
database=MYMON_DB,
port=MYMON_PORT,
charset='utf8',
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
print(f"[InitTable] Failed to connect to database: Error: {e}")
return -1
# Cursor create
# 커서: 쿼리 결과를 순회하고 조작하는 객체
try:
cursor = g_mysql.cursor()
except Exception as e:
print(f"[InitTable] Failed to create cursor: Error: {e}")
g_mysql.close()
return -1
# SET NAMES
# UTF-8 문자셋 설정
try:
cursor.execute("SET NAMES utf8")
except Exception as e:
print(f"[InitTable] Failed to set charset: Error: {e}")
cursor.close()
g_mysql.close()
return -1
# Execute query
try:
cursor.execute(strQuery)
rows = cursor.fetchall()
except Exception as e:
print(f"[InitTable] Failed to query database item: Error: {e}")
cursor.close()
g_mysql.close()
return -1
cursor.close()
g_mysql.close()
# Result check
# 쿼리 실행은 됐지만, 쿼리 결과가 없는 경우
if not rows:
print("[InitTable] Select Server Error: result is empty")
return -1
# 쿼리 결과를 받아서, 각 레코드를 순회하며 dbHost[] 배열에 저장
# 이 배열은 이후 ThreadProcessList 함수에서 수집 루프에 사용됨
dbHost.clear()
for row in rows:
dbHost.append(
dbHost_struct(
host=row[0],
port=int(row[1]),
ip=row[2],
service=row[3]
)
)
g_nServerCnt = len(dbHost)
return 0
# 핵심 수집 실행 로직
# 각 스레드별로 수집 대상 서버 목록을 InitTable 함수에서 받아와서, 수집 작업을 병렬 처리한 뒤 결과를 LogDB에 기록
# InitTable 함수 1번 호출, ThreadProcessList 함수 반복 호출됨
def ThreadMain(dummy_arg=None):
global g_IntervalServer, g_nServerCnt, dbHost
global g_strCurrTime, g_strCurrDate, g_nCount, g_fCheckLog
# 현재 시간 문자열 준비
now = datetime.datetime.now()
g_strCurrTime = now.strftime("%Y%m%d%H%M%S")
g_strCurrDate = now.strftime("%Y%m%d")
# 로그 파일명 생성
strFileNameLog = f"./log/_myproc_{strUserID}.log.{now.strftime('%Y%m%d')}"
# 스크립트 실행 디렉토리 안에 log 디렉토리가 없으면 에러
try:
g_fCheckLog = open(strFileNameLog, "a")
except Exception as e:
print(f"Cannot create log file Error! {e}")
exit(-1)
# 서버 목록 초기화
# 사용자 ID에 해당하는 서버 목록을 DB에서 불러옴 - dbHost[], g_nServerCnt
# 이 부분 실패 시 에러 로그를 파일에 남기기 위해, C 코드와는 실행 순서를 달리 함 -> segmentation fault(segfault) 방지
if InitTable() == -1:
print("### MyMon ServerDB tb_serverinfo Table Error!! ###")
g_fCheckLog.write(f"{now.strftime('%Y-%m-%d %H:%M:%S')}, MyMon ServerDB tb_serverinfo Table Error!!\n")
g_fCheckLog.close()
exit(-1)
# 스레드 수 결정
if g_nServerCnt > THREADCOUNT:
nThreadCount = THREADCOUNT
else:
nThreadCount = 1
nIntervalCount = g_nServerCnt // nThreadCount
# 스레드별 할당 범위 설정
g_IntervalServer = [Interval_Server(0, 0) for _ in range(nThreadCount)]
for i in range(nThreadCount):
start_idx = i * nIntervalCount
end_idx = start_idx + nIntervalCount - 1
if i == nThreadCount - 1: # 마지막 스레드는 나머지 포함
end_idx = g_nServerCnt - 1
g_IntervalServer[i] = Interval_Server(nStart=start_idx, nEnd=end_idx)
# 실행 시간 측정 시작
start_time = time.time()
threads = []
for i in range(nThreadCount):
t = threading.Thread(target=ThreadProcessList, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
# 실행 시간 측정 종료
end_time = time.time()
dElapsedTime = end_time - start_time
per_server = dElapsedTime / g_nServerCnt if g_nServerCnt > 0 else 0
# 로그 기록
log_line = f"\n{now.strftime('%Y-%m-%d %H:%M:%S')}, Server:{g_nServerCnt}, Elapsed:{dElapsedTime:.3f}, perServer:{per_server:.3f}\n"
print(log_line.strip())
g_fCheckLog.write(log_line)
g_fCheckLog.close()
# 스레드별로 각자 맡은 대상 서버에 접속해 정보 수집, 로그DB에 기록
def ThreadProcessList(nOrder: int):
global dbHost, g_IntervalServer, g_strCurrTime, g_strCurrDate, g_nCount
# Log DB 연결
try:
log_db = pymysql.connect(
host=MYSQL_LOG_HOST,
user=MYSQL_LOG_USER,
password=MYSQL_LOG_PASSWD,
database=MYSQL_LOG_DB,
port=MYSQL_LOG_PORT,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
print(f"Failed to connect to meta database: Error: {e}")
time.sleep(1)
return
# 테이블명 생성 (YYYYMMDD_HH 형식)
# 테이블 생성은 INSERT 직전에 수행
table_date_time = g_strCurrTime[:8] # YYYYMMDD
table_hour = g_strCurrTime[8:10] # HH
processlist_table = f"processlist_{table_date_time}_{table_hour}"
innodb_table = f"innodb_{table_date_time}_{table_hour}"
innodb_lock_table = f"innodb_lock_{table_date_time}_{table_hour}"
# 서버 범위 가져오기
nStart = g_IntervalServer[nOrder].nStart
nEnd = g_IntervalServer[nOrder].nEnd
for i in range(nStart, nEnd + 1):
host = dbHost[i].host
port = dbHost[i].port
ip = dbHost[i].ip
service = dbHost[i].service
# 대상 서버 접속 시도
try:
svc_db = pymysql.connect(
host=ip,
user=MYSQL_USER,
password=MYSQL_PASSWD,
database='mysql',
port=port,
charset='utf8',
cursorclass=pymysql.cursors.Cursor,
connect_timeout=g_nTimeout
)
except pymysql.MySQLError as e:
if port != 0:
print(f" > [{host}:{port}], Failed to connect to database: Error: {e}")
continue
# 특정 서버 제외 (ifsr-dev*)
if host.startswith("ifsr-dev"):
svc_db.close()
continue
try:
# 쿼리 실행: processlist, innodb status, version, innodb lock
# ================================
# SHOW FULL PROCESSLIST
# ================================
cursor = None
try:
cursor = svc_db.cursor() # 대상 서버 커서(쿼리 결과를 순회하는 객체) 생성
except Exception as e:
print(f" * processList cursor create failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
svc_db.close()
continue
try:
cursor.execute("SHOW FULL PROCESSLIST") # C 코드에선 strProcess 문자열을 사용했으나, 여기선 바로 쿼리문을 넣음. 가독성
result = cursor.fetchall()
except Exception as e:
print(f" * Query processList Result result is null: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
svc_db.close()
continue # goto QUERYERROR;과 같은 효과
if not result:
print(f" * processList mysql_num_rows is Zero: {host} {port}")
cursor.close()
svc_db.close()
continue
for row in result:
# row mapping (SHOW FULL PROCESSLIST)
# 0: Id, 1: User, 2: Host, 3: db, 4: Command, 5: Time, 6: State, 7: Info
# ----------------
# Filter conditions (C 코드 그대로)
# ----------------
if (
(row[1] == "system user" and (row[6] or "").startswith("Waiting for master")) or
row[1] == "event_scheduler" or
(row[4] == "Sleep" and int(row[5] or 0) < 20000) or
row[4] == "Binlog Dump"
):
continue
# ----------------
# 값 매핑
# ----------------
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
str_Pid = str(row[0] or "0")
str_user = row[1] or "NULL"
# ip:port → ip 분리
str_ip = row[2] or "NULL"
if ":" in str_ip:
str_ip = str_ip.split(":")[0]
str_db = row[3] or "NULL"
str_command = row[4] or "NULL"
str_ExecTime = str(row[5] or "0")
str_state = row[6] or "NULL"
str_query = row[7] or "NULL"
nCount = g_nCount
# ----------------
# Warning log 출력 (C 코드 조건 그대로)
# ----------------
if (
(str_state.startswith("Locked") and int(str_ExecTime) > 5) or
str_state.startswith("Killed") or
(str_state.startswith("Sending") and int(str_ExecTime) > 5) or
(str_state.startswith("Waiting") and int(str_ExecTime) > 5) or
(str_command.startswith("Query") and int(str_ExecTime) > 5) or
(str_state.startswith("Sorting") and int(str_ExecTime) > 5)
):
warn_msg = (
f"\n {str_host}:{nPort} [{service}] - "
f"State: {str_state}, Time: {str_ExecTime}, "
f"User: {str_user}@{str_ip}\n"
f" DataBase: {str_db}\n"
f" Query : {str_query}\n"
)
print(warn_msg, end="")
if g_fCheckLog:
g_fCheckLog.write(warn_msg)
# ----------------
# INSERT INTO processlist
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * processlist insert cursor error: {e}")
continue
try:
# processlist 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_processlist = f"""
CREATE TABLE IF NOT EXISTS `{processlist_table}` (
`logtime` varchar(15) NOT NULL default '',
`logdate` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`pid` varchar(14) NOT NULL default '',
`user` varchar(20) default NULL,
`ip` varchar(21) NOT NULL default '',
`db` varchar(20) default NULL,
`command` varchar(30) default NULL,
`execTime` varchar(14) default NULL,
`state` varchar(30) default NULL,
`Query` blob,
`nOrder` int(11) NOT NULL default '0',
PRIMARY KEY (`logtime`,`host`,`port`,`ip`,`pid`,`nOrder`),
KEY `idx1` (`logdate`,`host`,`port`,`ip`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_processlist)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{processlist_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,COMPRESS(%s),%s)
""",
(
str_logtime,
str_logdate,
str_host,
nPort,
str_Pid,
str_user,
str_ip,
str_db,
str_command,
str_ExecTime,
str_state,
str_query,
nCount,
),
)
log_db.commit()
except Exception as e:
print(f" * processlist insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW ENGINE INNODB STATUS
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SHOW ENGINE INNODB STATUS")
result = cursor.fetchall()
except Exception as e:
print(f" * InnoDB Status query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * InnoDB Status result is empty: {host} {port}")
cursor.close()
continue
for row in result:
# ----------------
# 버전별 InnoDB Status 위치 분기
# ----------------
if row[0] and row[0].startswith("InnoDB"):
# MySQL 5.1+
str_status = row[2] if row[2] else "NULL"
nFlag = 1
else:
# MySQL 5.0
str_status = row[0] if row[0] else "NULL"
nFlag = 0
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
# ----------------
# INSERT INTO innodb
# ----------------
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * innodb insert cursor error: {e}")
cursor.close()
continue
try:
# innodb 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb = f"""
CREATE TABLE IF NOT EXISTS `{innodb_table}` (
`logtime` varchar(15) NOT NULL default '',
`host` varchar(20) NOT NULL default '',
`port` int(11) NOT NULL default '0',
`nOrder` int(11) NOT NULL default '0',
`Status` blob,
PRIMARY KEY (`logtime`,`host`,`port`,`nOrder`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb)
cursor_create.close()
log_cursor.execute(
f"""
INSERT INTO `{innodb_table}`
VALUES (%s, %s, %s, %s, COMPRESS(%s))
""",
(
str_logtime,
str_host,
nPort,
nCount,
str_status,
),
)
log_db.commit()
except Exception as e:
print(f" * innodb insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close()
cursor.close()
# ================================
# SHOW MYSQL VERSION
# ================================
try:
cursor = svc_db.cursor()
cursor.execute("SELECT VERSION()")
result = cursor.fetchone()
except Exception as e:
print(f" * VERSION query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * VERSION result is empty: {host} {port}")
cursor.close()
continue
str_version = result[0]
cursor.close()
# ----------------
# MySQL 버전 파싱
# ----------------
# 예: "5.7.35-log" or "8.0.32"
if str_version.startswith("5.7") or str_version.startswith("5.6"):
str_lock_query = """
SELECT r.trx_mysql_thread_id as waiting_thread,
r.trx_id as waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) as waiting_time,
r.trx_query as waiting_query,
b.trx_mysql_thread_id as blocking_thread,
b.trx_id as blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) as blocking_time,
b.trx_query as blocking_query
FROM information_schema.innodb_lock_waits w
JOIN information_schema.innodb_trx b ON b.trx_id = w.blocking_trx_id
JOIN information_schema.innodb_trx r ON r.trx_id = w.requesting_trx_id
"""
elif str_version.startswith("8.0"):
str_lock_query = """
SELECT r.trx_mysql_thread_id AS waiting_thread,
r.trx_id AS waiting_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(r.trx_wait_started) AS waiting_time,
r.trx_query AS waiting_query,
b.trx_mysql_thread_id AS blocking_thread,
b.trx_id AS blocking_trx_id,
UNIX_TIMESTAMP(NOW()) - UNIX_TIMESTAMP(b.trx_started) AS blocking_time,
b.trx_query AS blocking_query
FROM performance_schema.data_lock_waits dw
JOIN information_schema.innodb_trx r ON r.trx_id = dw.REQUESTING_ENGINE_TRANSACTION_ID
JOIN information_schema.innodb_trx b ON b.trx_id = dw.BLOCKING_ENGINE_TRANSACTION_ID
"""
else:
print(f" * MySQL Version [{str_version}] not supported for LOCK analysis: {host} {port}")
continue
# ================================
# SHOW INNODB LOCK
# ================================
if nFlag == 0: # MySQL 5.0 이하는 LOCK 분석 스킵
continue
try:
cursor = svc_db.cursor()
cursor.execute(str_lock_query)
result = cursor.fetchall()
except Exception as e:
print(f" * InnoDB Lock query failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
cursor.close()
continue
if not result:
print(f" * InnoDB Lock result is empty: {host} {port}")
cursor.close()
continue
for row in result:
str_logtime = g_strCurrTime
str_logdate = g_strCurrDate
str_host = host
nPort = port
try:
log_cursor = log_db.cursor()
except Exception as e:
print(f" * innodb_lock insert cursor error: {e}")
continue
try:
# innodb_lock 테이블 생성 (테이블이 없는 경우)
cursor_create = log_db.cursor()
create_innodb_lock = f"""
CREATE TABLE IF NOT EXISTS `{innodb_lock_table}` (
`logtime` varchar(15) NOT NULL DEFAULT '',
`logdate` varchar(15) NOT NULL DEFAULT '',
`host` varchar(20) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`waiting_trx_id` varchar(18),
`waiting_thread_id` bigint(21) unsigned,
`waiting_time` int(11),
`waiting_query` varchar(1024),
`blocking_trx_id` varchar(18),
`blocking_thread_id` bigint(21) unsigned,
`blocking_time` int(11),
`blocking_query` varchar(1024),
KEY( host, port, logdate, logtime)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
"""
cursor_create.execute(create_innodb_lock)
cursor_create.close()
waiting_thread = int(row[0]) if row[0] else 0
waiting_trx_id = row[1] or "0"
waiting_time = int(row[2]) if row[2] else 0
waiting_query = row[3] or "0"
blocking_thread = int(row[4]) if row[4] else 0
blocking_trx_id = row[5] or "0"
blocking_time = int(row[6]) if row[6] else 0
blocking_query = row[7] or "0"
log_cursor.execute(
f'''
INSERT INTO `{innodb_lock_table}`
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
''',
(
str_logtime,
str_logdate,
str_host,
nPort,
waiting_trx_id,
waiting_thread,
waiting_time,
waiting_query,
blocking_trx_id,
blocking_thread,
blocking_time,
blocking_query,
)
)
log_db.commit()
except Exception as e:
print(f" * innodb_lock insert failed: {host} {port}")
print(f" * MySQL Error Message: {e}")
finally:
log_cursor.close() # log_db 커서 종료
cursor.close() # 대상 서버 커서 종료
finally: # 예외 발생 여부와 상관없이 항상 접속 종료
svc_db.close() # 대상 서버 접속 종료
log_db.close() # Log DB 접속 종료
# 시그널 핸들러: SIGALRM 수신 시 ThreadMain 호출
def signal_handler(signum, frame):
global g_nCount
ThreadMain(1)
g_nCount += 1
# 프로그램 진입점. signal handler 설정, 주기적 호출 준비
# ThreadMain 함수 호출
# 실행 명령어 python process2.py admin01 실행 시, argv[1]은 "admin01"
def main():
global strUserID, g_nCount
if len(sys.argv) != 2:
print("Usage: python process2.py <admin_id>")
sys.exit(1)
strUserID = sys.argv[1]
# 최초 1회 수집 수행
ThreadMain(1)
# 디버그 모드: 무한히 직접 호출
if DEBUG:
while True:
ThreadMain(1)
g_nCount += 1
time.sleep(INTERVALTIME_SEC) # C 코드에선 sleep 없이 바로 호출했으나, 여기선 약간의 지연 추가
return
# 시그널 핸들러 등록
signal.signal(signal.SIGALRM, signal_handler)
while True:
# INTERVALTIME_SEC 초 후 SIGALRM 발생 예약
signal.alarm(INTERVALTIME_SEC)
signal.pause() # SIGALRM 대기
if __name__ == "__main__":
main()'인턴' 카테고리의 다른 글
| [스터디1] Docker (1) | 2026.01.25 |
|---|---|
| [과제6-1] 웹 띄우기: MyMon Dev환경 세팅(웹 소스코드 도커라이징) (0) | 2026.01.21 |
| [과제5-5] DB 모니터링 프로그램 Python 변환 5편: Python 의존성 패키지 설치, 코드 실행 테스트 (0) | 2026.01.17 |
| [과제5-4] DB 모니터링 프로그램 Python 변환 4편: Python 코드 작성 (0) | 2026.01.17 |
| [과제5-3] DB 모니터링 프로그램 Python 변환 3편: MySQL Client 설치, C 의존성 패키지 설치, C 코드 실행 테스트 (0) | 2026.01.17 |