Django可以很好地自动序列化从DB返回到JSON格式的ORM模型。

如何序列化SQLAlchemy查询结果为JSON格式?

我试过jsonpickle。编码,但它编码查询对象本身。 我尝试了json.dumps(items),但它返回

TypeError: <Product('3', 'some name', 'some desc')> is not JSON serializable

将SQLAlchemy ORM对象序列化为JSON /XML真的那么难吗?它没有任何默认序列化器吗?现在序列化ORM查询结果是非常常见的任务。

我所需要的只是返回SQLAlchemy查询结果的JSON或XML数据表示。

需要在javascript datagird中使用JSON/XML格式的SQLAlchemy对象查询结果(JQGrid http://www.trirand.com/blog/)


当前回答

你可以像这样使用SqlAlchemy的自省:

mysql = SQLAlchemy()
from sqlalchemy import inspect

class Contacts(mysql.Model):  
    __tablename__ = 'CONTACTS'
    id = mysql.Column(mysql.Integer, primary_key=True)
    first_name = mysql.Column(mysql.String(128), nullable=False)
    last_name = mysql.Column(mysql.String(128), nullable=False)
    phone = mysql.Column(mysql.String(128), nullable=False)
    email = mysql.Column(mysql.String(128), nullable=False)
    street = mysql.Column(mysql.String(128), nullable=False)
    zip_code = mysql.Column(mysql.String(128), nullable=False)
    city = mysql.Column(mysql.String(128), nullable=False)
    def toDict(self):
        return { c.key: getattr(self, c.key) for c in inspect(self).mapper.column_attrs }

@app.route('/contacts',methods=['GET'])
def getContacts():
    contacts = Contacts.query.all()
    contactsArr = []
    for contact in contacts:
        contactsArr.append(contact.toDict()) 
    return jsonify(contactsArr)

@app.route('/contacts/<int:id>',methods=['GET'])
def getContact(id):
    contact = Contacts.query.get(id)
    return jsonify(contact.toDict())

从下面的答案中得到启发: 将sqlalchemy行对象转换为python dict

其他回答

你可以把你的对象输出为一个字典:

class User:
   def as_dict(self):
       return {c.name: getattr(self, c.name) for c in self.__table__.columns}

然后使用User.as_dict()序列化对象。

如将sqlalchemy行对象转换为python dict中所述

(Sasha B的回答非常棒)

这特别地将datetime对象转换为字符串,在原始答案中将转换为None:

# Standard library imports
from datetime import datetime
import json

# 3rd party imports
from sqlalchemy.ext.declarative import DeclarativeMeta

class JsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj.__class__, DeclarativeMeta):
            dict = {}

            # Remove invalid fields and just get the column attributes
            columns = [x for x in dir(obj) if not x.startswith("_") and x != "metadata"]

            for column in columns:
                value = obj.__getattribute__(column)

                try:
                    json.dumps(value)
                    dict[column] = value
                except TypeError:
                    if isinstance(value, datetime):
                        dict[column] = value.__str__()
                    else:
                        dict[column] = None
            return dict

        return json.JSONEncoder.default(self, obj)

在Flask下,它工作并处理datatime字段,转换类型字段 “时间”:datetime。Datetime(2018, 3, 22, 15, 40)成 “时间”:“2018-03-22 15:40:00”:

obj = {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

# This to get the JSON body
return json.dumps(obj)

# Or this to get a response object
return jsonify(obj)

更详细的解释。 在你的模型中,添加:

def as_dict(self):
       return {c.name: str(getattr(self, c.name)) for c in self.__table__.columns}

str()是针对python3的,所以如果使用python2则使用unicode()。它应该有助于反序列化日期。如果不处理这些,你可以删除它。

现在可以像这样查询数据库

some_result = User.query.filter_by(id=current_user.id).first().as_dict()

需要First()来避免奇怪的错误。As_dict()现在将反序列化结果。反序列化之后,就可以将其转换为json了

jsonify(some_result)
class SqlToDict:
    def __init__(self, data) -> None:
        self.data = data

    def to_timestamp(self, date):
        if isinstance(date, datetime):
            return int(datetime.timestamp(date))
        else:
            return date

    def to_dict(self) -> List:
        arr = []
        for i in self.data:
            keys = [*i.keys()]
            values = [*i]
            values = [self.to_timestamp(d) for d in values]
            arr.append(dict(zip(keys, values)))
        return arr

例如:

SqlToDict(data).to_dict()