ORM 对象关系映射
类 ---------->映射---------> 表
对象 ------>映射----------> 一条记录
对象点属性 --->映射---> 记录对应的值
为了让类(表)实例化的时候接收更多的k=v 让它继承一个字典
表查询(select * from 表名 | select * from 表名 where k= v)
表更新(update 表名 set name= ? ,age=?)
表插入(insert into 表名(name,age) values('xxx',18))
基本上都是关键字
所以继承字典的类 自己定义一个init(self,**kwargs)
然后在继承字典的 init
为了可以通过点属性的方式取值和赋值 定义__getattr__ __setattr__
class Models(dict,metaclass=MyMetaClass): def __init__(self,**kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item,'没有这个键') def __setattr__(self, key, value): self[key] = value
以后自己定义的类只需要继承models
为了限制类的创建过程让它继承一个元类(为了不那么麻烦 让models继承元类,后面类只需要继承models就可以了)
# 它是用来拦截跟数据库中表对应的类的创建class MyMetaClass(type): def __new__(cls, class_name, class_bases, class_attrs): # 我们定义的元类是用来拦截模型表的创建过程,而models并不是一张模型表,所以不需要它的创建过程 if class_name == 'Models': return type.__new__(cls,class_name,class_bases,class_attrs) table_name = class_attrs.get('table_name',class_name) primary_key = None mappings = {} # 下面的for循环需要做两件事 # 1.将单个单个的字段整合成一个 # 2.确定当前表当地哪个字段是主键 for k,v in class_attrs.items(): # k:id,name v:IntegerField(),StringField() # 拿出所有自己定义的表的字段属性 if isinstance(v,Field): # 将所有的自己定义的表的字段存入字典中 mappings[k] = v if v.primary_key: # 健壮性校验一张表不能有多个主键 if primary_key: raise TypeError("一张表只能有一个主键") primary_key = v.name # 循环mapping拿到所有的自定义字段名 for k in mappings.keys(): # 将单个单个的字段删除 class_attrs.pop(k) # 校验用户自定义的模型表是否指定了主键字段 if not primary_key: raise TypeError("一张表必须要有主键") # 将标示表的特征信息 表名,表的主键字段,表的其他字段都塞到类的名称空间中 class_attrs['table_name'] = table_name class_attrs['primary_key'] = primary_key class_attrs['mappings'] = mappings return type.__new__(cls, class_name, class_bases, class_attrs)
一张表 有表单 名字 类型 是否是主键 以及默认值
# 表的字段通常需要有的属,字段类性字段名型,是否是主键,默认值class Field(object): def __init__(self,name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default# 定义varchar字段类型class StringField(Field): def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): super().__init__(name,column_type,primary_key,default)# 定义int字段类型class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=0): super().__init__(name, column_type, primary_key, default)
表已经设置好了 然后就是 增 改 查
class Models(dict, metaclass=MyMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item,'没有该键!') def __setattr__(self, key, value): self[key] = value @classmethod def select(cls,**kwargs): # id=1,name='jason',password='123' ms = Mysql() # select * from %s if not kwargs: sql = "select * from %s"%cls.table_name res = ms.select(sql) else: # select * from %s where %s=%s k = list(kwargs.keys())[0] v = kwargs.get(k) sql = "select * from %s where %s=?"%(cls.table_name,k) # select * from user where id=? sql = sql.replace('?','%s') # select * from user where id=%s res = ms.select(sql,v) if res: # res = [{},{},{}] # cls(name='...',password='...') return [cls(**r) for r in res] # [obj1,obj2,obj3] def update(self): ms = Mysql() # update user set name='jason',password='123' where id = 1 # update user set name=%s,password=%s where id = 1 # 定义一个列表存储该表的所有字段名 fields = [] # 定义一个变量用来存储当前数据对象的主键值 pr = None values = [] for k,v in self.mappings.items(): # 先把当前数据对象对应的主键值拿到 if v.primary_key: pr = getattr(self,v.name,v.default) else: # 除了主键之外的所有字段 fields.append(v.name+'=?') # [name=?,password=?...] values.append(getattr(self,v.name,v.default)) sql = "update %s set %s where %s=%s"%(self.table_name,','.join(fields),self.primary_key,pr) # update user set name=?,password=? where id=1 sql = sql.replace('?','%s') # update user set name=%s,password=%s where id=1 ms.execute(sql,values) def save(self): ms = Mysql() # insert into user(name,password) values('jason','123') fields = [] # 专门用来存储与字段对应数量的? args = [] values = [] for k,v in self.mappings.items(): # name:StringField(name='name') if not v.primary_key: # 将id字段去除 因为id字段是自增,不需要人为的去操作 fields.append(v.name) args.append('?') values.append(getattr(self,v.name,v.default)) sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args)) # insert into user(name,password) values(?,?) sql = sql.replace("?",'%s') # insert into user(name,password) values(%s,%s) ms.execute(sql,values)
from DBUtils.PooledDB import PooledDBimport pymysqlPOOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=3, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='', database='youku', charset='utf8', autocommit='True')
import pymysqlfrom orm_pool.db_pool import POOLclass Mysql(object): def __init__(self): self.conn = POOL.connection() self.cursor = self.conn.cursor(pymysql.cursors.DictCursor) def close_db(self): self.cursor.close() self.conn.close() def select(self,sql,args=None): self.cursor.execute(sql,args) res = self.cursor.fetchall() # 注意一点:fetchall拿到的数据结构是一个列表套字典[{},{},{}] return res def execute(self,sql,args): # insert into user(name,password) values('jason','123') # update user set name='jason',passsword='456' where id=1 try: self.cursor.execute(sql, args) except BaseException as e: print(e)
from orm_singleton.mysql_singleton import Mysql# 表的字段通常需要有的属,字段类性字段名型,是否是主键,默认值class Field(object): def __init__(self,name, column_type, primary_key, default): self.name = name self.column_type = column_type self.primary_key = primary_key self.default = default# 定义varchar字段类型class StringField(Field): def __init__(self, name, column_type='varchar(255)', primary_key=False, default=None): super().__init__(name,column_type,primary_key,default)# 定义int字段类型class IntegerField(Field): def __init__(self, name, column_type='int', primary_key=False, default=0): super().__init__(name, column_type, primary_key, default)# 它是用来拦截跟数据库中表对应的类的创建class MyMetaClass(type): def __new__(cls, class_name, class_bases, class_attrs): # 我们定义的元类是用来拦截模型表的创建过程,而models并不是一张模型表,所以不需要它的创建过程 if class_name == 'Models': return type.__new__(cls,class_name,class_bases,class_attrs) table_name = class_attrs.get('table_name',class_name) primary_key = None mappings = {} # 下面的for循环需要做两件事 # 1.将单个单个的字段整合成一个 # 2.确定当前表当地哪个字段是主键 for k,v in class_attrs.items(): # k:id,name v:IntegerField(),StringField() # 拿出所有自己定义的表的字段属性 if isinstance(v,Field): # 将所有的自己定义的表的字段存入字典中 mappings[k] = v if v.primary_key: # 健壮性校验一张表不能有多个主键 if primary_key: raise TypeError("一张表只能有一个主键") primary_key = v.name # 循环mapping拿到所有的自定义字段名 for k in mappings.keys(): # 将单个单个的字段删除 class_attrs.pop(k) # 校验用户自定义的模型表是否指定了主键字段 if not primary_key: raise TypeError("一张表必须要有主键") # 将标示表的特征信息 表名,表的主键字段,表的其他字段都塞到类的名称空间中 class_attrs['table_name'] = table_name class_attrs['primary_key'] = primary_key class_attrs['mappings'] = mappings return type.__new__(cls, class_name, class_bases, class_attrs)class Models(dict, metaclass=MyMetaClass): def __init__(self, **kwargs): super().__init__(**kwargs) def __getattr__(self, item): return self.get(item,'没有该键!') def __setattr__(self, key, value): self[key] = value @classmethod def select(cls,**kwargs): # id=1,name='jason',password='123' ms = Mysql() # select * from %s if not kwargs: sql = "select * from %s"%cls.table_name res = ms.select(sql) else: # select * from %s where %s=%s k = list(kwargs.keys())[0] v = kwargs.get(k) sql = "select * from %s where %s=?"%(cls.table_name,k) # select * from user where id=? sql = sql.replace('?','%s') # select * from user where id=%s res = ms.select(sql,v) if res: # res = [{},{},{}] # cls(name='...',password='...') return [cls(**r) for r in res] # [obj1,obj2,obj3] def update(self): ms = Mysql() # update user set name='jason',password='123' where id = 1 # update user set name=%s,password=%s where id = 1 # 定义一个列表存储该表的所有字段名 fields = [] # 定义一个变量用来存储当前数据对象的主键值 pr = None values = [] for k,v in self.mappings.items(): # 先把当前数据对象对应的主键值拿到 if v.primary_key: pr = getattr(self,v.name,v.default) else: # 除了主键之外的所有字段 fields.append(v.name+'=?') # [name=?,password=?...] values.append(getattr(self,v.name,v.default)) sql = "update %s set %s where %s=%s"%(self.table_name,','.join(fields),self.primary_key,pr) # update user set name=?,password=? where id=1 sql = sql.replace('?','%s') # update user set name=%s,password=%s where id=1 ms.execute(sql,values) def save(self): ms = Mysql() # insert into user(name,password) values('jason','123') fields = [] # 专门用来存储与字段对应数量的? args = [] values = [] for k,v in self.mappings.items(): # name:StringField(name='name') if not v.primary_key: # 将id字段去除 因为id字段是自增,不需要人为的去操作 fields.append(v.name) args.append('?') values.append(getattr(self,v.name,v.default)) sql = "insert into %s(%s) values(%s)"%(self.table_name,','.join(fields),','.join(args)) # insert into user(name,password) values(?,?) sql = sql.replace("?",'%s') # insert into user(name,password) values(%s,%s) ms.execute(sql,values)