python_mariadb
2022/08/02
5-minute read
倾盖如故
偶尔翻翻github的推荐,看到了https://github.com/chinese-poetry/chinese-poetry这样一个项目,诗词嗯,很棒,忽然就萌生了把玩的想法,于是也就有了这篇文章的开始。mariadb一直是我很喜欢的软件,工作以后越来越喜欢开源的一些工具了,用起来没有麻烦。
奇思妙想
起初的时候,想法很简单也很纯粹,就是把项目中的json文件的数据保存到数据库中。嗯,就是最简单的把它存进去,我也没想到会出来很多很多的问题。最初始版本的代码已经不见了,那时候连接mariadb的写法还是直接从官网默认的方式写的,后面发现需求不断地增加,代码越来越乱,做了一点改进,但好像并没有什么特别的地方,只是优化出了第一版,也暴露了一些没有考虑过的问题。完整代码如下:
import json
import logging
import os
import time
import mariadb
# config path
root_path = r'D:\chinese-poetry-master'
tangshi_path = os.path.join(root_path, r'json')
songci_path = os.path.join(root_path, r'ci')
huajian_path = os.path.join(root_path, r'wudai\huajianji')
shijing_path = os.path.join(root_path, r'shijing')
# config mariadb connect
pool = mariadb.ConnectionPool(
pool_name='my_pool',
pool_size=4,
host='127.0.0.1',
user='root',
password='root',
database='poem',
autocommit=True,
)
# config log
t = time.localtime(time.time())
cur_time = f'{t.tm_year:04}{t.tm_mon:02}{t.tm_mday:02}'
logging.basicConfig(format="%(asctime)s %(levelname)s [line:%(lineno)d] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S(%p)",
level=logging.DEBUG,
handlers=[logging.FileHandler(filename=cur_time + ".log", encoding='utf-8', mode='a+')])
def get_paths(path) -> list:
file_paths = []
files = os.listdir(path)
for file in files:
if '.json' in file:
file_paths.append(os.path.join(path, file))
if not file_paths:
logging.info(f"json file is None: {path}")
return file_paths
def save_to_mariadb(file_path, mtype=None):
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
flag = True
if mtype == 'songci':
for i in data:
m_rhythmic, m_author, m_paragraphs = i["rhythmic"], i["author"], ''.join(i["paragraphs"])
cursor.execute("select * from ci where rhythmic = ? and author = ? and paragraphs = ?",
(m_rhythmic, m_author, m_paragraphs))
if cursor.rowcount == 0:
cursor.execute("INSERT INTO ci (rhythmic,author,paragraphs) VALUES (?, ?, ?)",
(m_rhythmic, m_author, m_paragraphs))
elif cursor.rowcount != 1:
if flag:
logging.info(f"Duplicate records exist in the database. file path is {file_path}")
flag = False
[logging.info(i) for i in cursor]
...
elif mtype == 'author_ci' or mtype == 'author_shi':
for i in data:
if mtype == 'author_ci':
m_description, m_name, m_short_description = i["description"], i["name"], i["short_description"]
cursor.execute("select * from author where full_desc = ? and author = ? and short_desc = ?",
(m_description, m_name, m_short_description))
if cursor.rowcount == 0:
cursor.execute("INSERT INTO author (author,full_desc,short_desc) VALUES (?, ?, ?)",
(m_name, m_description, m_short_description))
elif cursor.rowcount != 1:
if flag:
logging.info(f"Duplicate records exist in the database.file path is {file_path}")
flag = False
[logging.info(i) for i in cursor]
elif mtype == 'author_shi':
m_desc, m_name, m_id = i["desc"], i["name"], i["id"]
cursor.execute("select * from author where full_desc = ? and author = ? and id = ?",
(m_desc, m_name, m_id))
if cursor.rowcount == 0:
cursor.execute("INSERT INTO author (author,full_desc,self_id) VALUES (?, ?, ?)",
(m_name, m_desc, m_id))
elif cursor.rowcount != 1:
if flag:
logging.info(f"Duplicate records exist in the database.file path is {file_path}")
flag = False
[logging.info(i) for i in cursor]
elif mtype == 'tangshi':
for i in data:
m_author, m_paragraphs, m_title, m_id = i["author"], ''.join(i["paragraphs"]), i["title"], i["id"]
cursor.execute("select * from shi where author = ? and paragraphs = ? and title = ? and self_id = ?",
(m_author, m_paragraphs, m_title, m_id))
if cursor.rowcount == 0:
cursor.execute("INSERT INTO shi (author,paragraphs,title,self_id) VALUES (?, ?, ?,?)",
(m_author, m_paragraphs, m_title, m_id))
elif cursor.rowcount != 1:
if flag:
logging.info(f"Duplicate records exist in the database. file path is {file_path}")
flag = False
[logging.info(i) for i in cursor]
else:
logging.debug("type out of range")
def json_file(file_paths: list):
for path in file_paths:
logging.debug(f"Start processing file:{path}")
if os.path.split(path)[0] == tangshi_path:
if "author" in os.path.basename(path):
save_to_mariadb(path, 'author_shi')
...
elif "poet" in os.path.basename(path):
save_to_mariadb(path, 'tangshi')
...
elif os.path.split(path)[0] == songci_path:
if os.path.basename(path) == "author.song.json":
save_to_mariadb(path, 'author_ci')
elif "ci.song" in os.path.basename(path):
save_to_mariadb(path, 'songci')
else:
logging.debug(f"the file: {os.path.basename(path)} out of program")
...
else:
logging.warning("Exceeds the processing scope of the program. Check whether the problem exists.")
def create_table(create=True) -> None:
"""
include create and drop
:param create: if you need drop table, make create false, be careful!!!
:return:None
"""
table_names = ['shi', 'ci', 'author']
commands = []
create_shi = """
create table if not exists shi(
id int not null auto_increment,
author varchar(255),
paragraphs text,
title varchar(255),
self_id varchar(255),
dynastic varchar(255),
primary key (id)
);
"""
create_ci = """
create table if not exists ci(
id int not null auto_increment,
author varchar(255),
paragraphs text,
rhythmic varchar(255),
primary key (id)
);
"""
create_author = """
create table if not exists author(
id int not null auto_increment,
author varchar(255),
full_desc text,
short_desc text,
self_id varchar(255),
dynastic varchar(255),
primary key (id)
);
"""
commands.append(create_shi)
commands.append(create_ci)
commands.append(create_author)
for i in commands:
try:
cursor.execute(i)
except mariadb.Error as e:
logging.error(f"Error create table {i}: {e}")
...
if not create:
for i in table_names:
command = "drop table " + i
try:
cursor.execute(command)
logging.warning(command)
except mariadb.Error as e:
logging.error(f"Error drop table {i}: {e}")
def core_exec():
json_file(get_paths(tangshi_path))
json_file(get_paths(songci_path))
...
if __name__ == '__main__':
conn = pool.get_connection()
cursor = conn.cursor()
create_table()
core_exec()
cursor.close()
conn.close()
...
梅开二度
上面的代码存在几个大的问题,当然,程序是可以正常跑的。对于sql这块我觉得写的很不好,维护起来会特别的困难,于是在思考如何优化。也就有了第一次尝试改进
def create_table():
table_name = ['shi', 'ci', 'author']
table_struct_shi_ci = """id int not null auto_increment,title varchar(255),author varchar(255),dynastic varchar(255),
content text,self_id varchar(255),primary key(id)"""
table_struct_author = """id int not null auto_increment,author varchar(255),dynastic varchar(255),full_desc text,
short_desc text primary key (id) """
table_struct = [table_struct_shi_ci, table_struct_shi_ci, table_struct_author]
table_dict = dict(zip(table_name, table_struct))
for key, value in table_dict.items():
sql_command = f"""create table if not exists {key} ( {value} );"""
print(sql_command)
return table_name, table_struct, table_dict
def insert_table(table_name, table_fields, table_values):
...
之所以insert会卡住,是因为列名没有处理好,所以在思考,后面忽然想到,为什么不用类呢?于是代码就变成:
class Table(object):
"""
how to use:
ci = Table()
column = ["id", "title", "author", "dynastic", "content", "self_id"]
column_desc = ["int not null auto_increment", "varchar(255)", "varchar(255)", "varchar(255)", "text", "varchar(255)"]
ci.set_all("ci", "id", dict(zip(column, column_desc)))
m_dict = {"title": "望岳", "author": "杜甫", "awb": "haha"}
ci.sql2insert(m_dict)
"""
table_name = None
primary_key = None
column_desc = None
sql_command = None
def set_all(self, table_name: str, primary_key: str, column_desc: dict):
self.table_name = table_name
self.primary_key = primary_key
self.column_desc = column_desc
def sql2create(self):
sql_str = ""
for key, value in self.column_desc.items():
sql_str = sql_str + key + " " + value + ","
self.sql_command = f"create table if not exists {self.table_name} ({sql_str} primary key ({self.primary_key}));"
return self.sql_command
def sql2drop(self):
self.sql_command = f"drop table {self.table_name}"
return self.sql_command
def sql2insert(self, data_dict):
column_key = [key for key in self.column_desc.keys()]
insert_key, insert_value = [], []
for key, value in data_dict.items():
if key in column_key:
insert_key.append(key)
insert_value.append(value)
else:
logging.debug(f"Error,this:{key}={value} will be ignored. Input data is:{data_dict}")
self.sql_command = f"insert into {self.table_name} ({','.join(insert_key)}) values ({','.join(insert_value)})"
return self.sql_command
事实上,一开始我只是想把数据放进去,已经创建表和删除表,至于insert完全是后面加的了,这时候我脑海其实已经有了把crud写完的想法了,但是,没有需求,所以就暂时不写了。因为我在思考ORM,是的,思考比实现更重要,我选择偷懒~~~