Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。
如何序列化SQLAlchemy查询结果为JSON格式?
我试过jsonpickle。编码,但它编码查询对象本身。
我尝试了json.dumps(items),但它返回
TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable
将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。
我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。
需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)
Python 3.7+和Flask 1.1+可以使用内置的数据类包
from dataclasses import dataclass
from datetime import datetime
from flask import Flask, jsonify
from flask_sqlalchemy import SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app)
@dataclass
class User(db.Model):
id: int
email: str
id = db.Column(db.Integer, primary_key=True, auto_increment=True)
email = db.Column(db.String(200), unique=True)
@app.route('/users/')
def users():
users = User.query.all()
return jsonify(users)
if __name__ == "__main__":
users = User(email="user1@gmail.com"), User(email="user2@gmail.com")
db.create_all()
db.session.add_all(users)
db.session.commit()
app.run()
/users/路由现在将返回一个用户列表。
[
{"email": "user1@gmail.com", "id": 1},
{"email": "user2@gmail.com", "id": 2}
]
自动序列化相关模型
@dataclass
class Account(db.Model):
id: int
users: User
id = db.Column(db.Integer)
users = db.relationship(User) # User model would need a db.ForeignKey field
jsonify(account)的响应是这样的。
{
"id":1,
"users":[
{
"email":"user1@gmail.com",
"id":1
},
{
"email":"user2@gmail.com",
"id":2
}
]
}
覆盖默认的JSON编码器
from flask.json import JSONEncoder
class CustomJSONEncoder(JSONEncoder):
"Add support for serializing timedeltas"
def default(o):
if type(o) == datetime.timedelta:
return str(o)
if type(o) == datetime.datetime:
return o.isoformat()
return super().default(o)
app.json_encoder = CustomJSONEncoder
我建议用棉花糖。它允许您创建序列化器来表示支持关系和嵌套对象的模型实例。
以下是他们文档中的一个删节的例子。以ORM模型为例,作者:
class Author(db.Model):
id = db.Column(db.Integer, primary_key=True)
first = db.Column(db.String(80))
last = db.Column(db.String(80))
该类的棉花糖模式是这样构造的:
class AuthorSchema(Schema):
id = fields.Int(dump_only=True)
first = fields.Str()
last = fields.Str()
formatted_name = fields.Method("format_name", dump_only=True)
def format_name(self, author):
return "{}, {}".format(author.last, author.first)
...并像这样使用:
author_schema = AuthorSchema()
author_schema.dump(Author.query.first())
...会产生这样的输出:
{
"first": "Tim",
"formatted_name": "Peters, Tim",
"id": 1,
"last": "Peters"
}
看看他们完整的Flask-SQLAlchemy示例。
一个名为marshmlow - SQLAlchemy的库专门集成了SQLAlchemy和marshmallow。在这个库中,上面描述的Author模型的模式如下所示:
class AuthorSchema(ModelSchema):
class Meta:
model = Author
该集成允许从SQLAlchemy Column类型推断字段类型。
marshmallow-sqlalchemy这里。
这是一个JSONEncoder版本,它保留了模型列的顺序,只保留递归定义的列和关系字段。它还格式化了大多数不可序列化的JSON类型:
import json
from datetime import datetime
from decimal import Decimal
import arrow
from sqlalchemy.ext.declarative import DeclarativeMeta
class SQLAlchemyJSONEncoder(json.JSONEncoder):
"""
SQLAlchemy ORM JSON Encoder
If you have a "backref" relationship defined in your SQLAlchemy model,
this encoder raises a ValueError to stop an infinite loop.
"""
def default(self, obj):
if isinstance(obj, datetime):
return arrow.get(obj).isoformat()
elif isinstance(obj, Decimal):
return float(obj)
elif isinstance(obj, set):
return sorted(obj)
elif isinstance(obj.__class__, DeclarativeMeta):
for attribute, relationship in obj.__mapper__.relationships.items():
if isinstance(relationship.__getattribute__("backref"), tuple):
raise ValueError(
f'{obj.__class__} object has a "backref" relationship '
"that would cause an infinite loop!"
)
dictionary = {}
column_names = [column.name for column in obj.__table__.columns]
for key in column_names:
value = obj.__getattribute__(key)
if isinstance(value, datetime):
value = arrow.get(value).isoformat()
elif isinstance(value, Decimal):
value = float(value)
elif isinstance(value, set):
value = sorted(value)
dictionary[key] = value
for key in [
attribute
for attribute in dir(obj)
if not attribute.startswith("_")
and attribute != "metadata"
and attribute not in column_names
]:
value = obj.__getattribute__(key)
dictionary[key] = value
return dictionary
return super().default(obj)
虽然这是一篇老文章,也许我没有回答上面的问题,但我想谈谈我的连载,至少它对我有用。
我使用FastAPI,SqlAlchemy和MySQL,但我不使用orm模型;
# from sqlalchemy import create_engine
# from sqlalchemy.orm import sessionmaker
# engine = create_engine(config.SQLALCHEMY_DATABASE_URL, pool_pre_ping=True)
# SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
序列化代码
import decimal
import datetime
def alchemy_encoder(obj):
"""JSON encoder function for SQLAlchemy special classes."""
if isinstance(obj, datetime.date):
return obj.strftime("%Y-%m-%d %H:%M:%S")
elif isinstance(obj, decimal.Decimal):
return float(obj)
import json
from sqlalchemy import text
# db is SessionLocal() object
app_sql = 'SELECT * FROM app_info ORDER BY app_id LIMIT :page,:page_size'
# The next two are the parameters passed in
page = 1
page_size = 10
# execute sql and return a <class 'sqlalchemy.engine.result.ResultProxy'> object
app_list = db.execute(text(app_sql), {'page': page, 'page_size': page_size})
# serialize
res = json.loads(json.dumps([dict(r) for r in app_list], default=alchemy_encoder))
如果不行,请忽略我的回答。我在这里提到它
https://codeandlife.com/2014/12/07/sqlalchemy-results-to-json-the-easy-way/
我知道这是一个相当老的帖子。我采取了@SashaB给出的解决方案,并根据我的需要进行了修改。
我添加了以下内容:
字段忽略列表:序列化时要忽略的字段列表
字段替换列表:包含在序列化时要被值替换的字段名的字典。
删除方法和BaseQuery被序列化
我的代码如下:
def alchemy_json_encoder(revisit_self = False, fields_to_expand = [], fields_to_ignore = [], fields_to_replace = {}):
"""
Serialize SQLAlchemy result into JSon
:param revisit_self: True / False
:param fields_to_expand: Fields which are to be expanded for including their children and all
:param fields_to_ignore: Fields to be ignored while encoding
:param fields_to_replace: Field keys to be replaced by values assigned in dictionary
:return: Json serialized SQLAlchemy object
"""
_visited_objs = []
class AlchemyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj.__class__, DeclarativeMeta):
# don't re-visit self
if revisit_self:
if obj in _visited_objs:
return None
_visited_objs.append(obj)
# go through each field in this SQLalchemy class
fields = {}
for field in [x for x in dir(obj) if not x.startswith('_') and x != 'metadata' and x not in fields_to_ignore]:
val = obj.__getattribute__(field)
# is this field method defination, or an SQLalchemy object
if not hasattr(val, "__call__") and not isinstance(val, BaseQuery):
field_name = fields_to_replace[field] if field in fields_to_replace else field
# is this field another SQLalchemy object, or a list of SQLalchemy objects?
if isinstance(val.__class__, DeclarativeMeta) or \
(isinstance(val, list) and len(val) > 0 and isinstance(val[0].__class__, DeclarativeMeta)):
# unless we're expanding this field, stop here
if field not in fields_to_expand:
# not expanding this field: set it to None and continue
fields[field_name] = None
continue
fields[field_name] = val
# a json-encodable dict
return fields
return json.JSONEncoder.default(self, obj)
return AlchemyEncoder
希望它能帮助到一些人!