flask 作为后端数据库操作是必要的,现在记录一下一些flask数据库的相关操作,
我将使用三种方法操作数据库
暂时使用较简单的sqlite作为例子
相关环境的安装
建议使用ve虚拟环境
sudo pacman -S sqlite # archlinux
sudo pip install virtualenv
# 在vertualenv环境下执行
pip install Flask-SQLAlchemy Jinja2 SQLAlchemy
最好是多看文档
1.使用sqlite3模块API
参考资料
这是最简单的方法,不仅适用于flask,python的其他方面也一样适用,如爬虫之类
连接数据库
#!/usr/bin/env python
# -*- coding=UTF-8 -*-
import sqlite3
database = /path/test.db #数据库文件路径
test = sqlite.connect('database') #连接数据库,如果数据库文件不存在则创建
print('connect database successfully')
test.close() #关闭数据库连接
如果将数据库名改为:memory:,则在内存中打开数据库而不是磁盘
创建表
database = /path/test.db
test = sqlite.connect('database')
test.execute('''CREATE TABLE BOOKS
(ID INT PRIMARY KEY NOT NULL,
TYPE TEXT NOT NULL,
NAME TEXT NOT NULL,
CONTENT TEXT);''')
print("Table created successfully")
test.close()
插入数据
database = /path/test.db
test = sqlite.connect('database')
test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \
VALUES (1, 'hello', 'world', 'helloworld')");
test.execute("INSERT INTO BOOKS (ID,TYPE,NAME,CONTENT) \
VALUES (2, 'goodbye', 'world', 'goodbyeworld')");
test.commit() #要使数据保存,必须提交
print("Records commited successfully")
test.close()
查询数据
database = /path/test.db
test = sqlite.connect('database')
cursor = test.execute("SELECT ID,TYPE,NAME,CONTENT from BOOKS")
for row in cursor:
print("ID =%d "%(row[0]))
print("TYPE =%s "%(row[1]))
print("NAME =%s "%(row[2]))
print("CONTENT =%s "%(row[3]))
test.close()
更新数据
database = /path/test.db
test = sqlite.connect('database')
test.execute("UPDATE BOOKS SET CONTENT = 'hello' WHERE ID=2")
test.commit
test.close()
删除数据
database = /path/test.db
test = sqlite.connect('database')
test.execute("DELETE FROM BOOKS WHERE ID=2")
test.commit
test.close()
由于数据库文件我已经在外部使用第一种方法创建,所以第二种方法我直接打开
2.使用文档上所说的方法
import sqlite3
from flask import g
DATABASE = '/path/to/database.db'
def connect_db():
return sqlite3.connect(DATABASE)
@app.before_request #使用app_request装饰器打开数据库
def before_request():
g.db = connect_db()
@app.teardown_request #使用app_request装饰器关闭数据库
def teardown_request(exception):
if hasattr(g, 'db'):
g.db.close()
def query_db(query, args=(), one=False): #数据库简化查询
cur = g.db.execute(query, args)
rv = [dict((cur.description[idx][0], value)
for idx, value in enumerate(row)) for row in cur.fetchall()]
return (rv[0] if rv else None) if one else rv
需要使用时(主要是查询)
for book in query_db('select * from BOOKS'):
print book['NAME'], 'has the id', book['ID']
#由于flask一般不使用print,可以这样
book = query_db('select * from BOOKS')
在模板中
{{ book.ID }}或着{{ book['ID'] }}
如果只希望得到一个单独的结果
book = query_db('select * from BOOKS where NAME = ?',
[the_bookname], one=True)
if book is None:
print 'No such user'
else:
print the_bookname, 'has the id', book['ID']
创建,更新,插入,删除数据请使用第一个方法
初始化数据库模型
from contextlib import closing
def init_db():
with closing(connect_db()) as db:
with app.open_resource('schema.sql') as f:
db.cursor().executescript(f.read())
db.commit()
3.使用Flask-SQLAlchemy扩展 (这应该是最推荐的方法)
一个最小应用
from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////tmp/test.db'
# sqlite打开的格式是sql:///三个"/",接着是数据库文件的**绝对路径**
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
email = db.Column(db.String(120), unique=True)
def __init__(self, username, email):
self.username = username
self.email = email
def __repr__(self):
return '<User %r>' % self.username
使用
假若上面代码保存为test.py
打开python shell
>>> from test import db,User
>>> db.create_all() #创建表
>>> admin = User('admin', '[email protected]') #创建数据
# 这时数据还未真正写入数据库,需要提交
>>> db.session.add(admin)
>>> db.session.commit() #这时数据已经写入数据库中
# 简单数据查询
>>> users = User.query.all()
>>> print(users)
[<User u'admin'>]
>>> admin = User.query.filter_by(username='admin').first()
>>> print(admin)
<User u'admin'>
如果想要简单的查看数据,推荐firefox的一个sqlite插件 sqlite manager
配置
SQLALCHEMY_DATABASE_URI #用于连接的数据库
SQLALCHEMY_BINDS #连接多个数据库
# 比如
SQLALCHEMY_BINDS = {
'users': 'mysqldb://localhost/users',
'appmeta': 'sqlite:////path/to/appmeta.db'
}
# 创建删除表
>>> db.create_all(bind=['users'])
>>> db.create_all(bind='appmeta')
# 引用绑定,使用 __bind_key__
class User(db.Model):
__bind_key__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
选择,插入,删除
插入
>>> from test import User
>>> me = User('admin', '[email protected]')
>>> db.session.add(me)
>>> db.session.commit()
删除
>>> db.session.delete(me)
>>> db.session.commit()
查询
首先插入如下数据
|id |username | email | |:-------:|:-------:|:---------------:| |1 |admin |[email protected]| |2 |peter |[email protected]| |3 |guest |[email protected]| 通过用户名查询用户:
>>> admin = User.query.filter_by(username='admin').first()
>>> print(admin.id)
1
>>> print(admin.email)
u'[email protected]'
查找不存在的用户名:
>>> missing = User.query.filter_by(username='missing').first()
>>> missing is None
True
使用更复杂的表达式查询一些用户:
>>> User.query.filter(User.email.endswith('@example.com')).all()
[<User u'admin'>, <User u'guest'>]
按某种规则对用户排序:
>>> User.query.order_by(User.username)
[<User u'admin'>, <User u'guest'>, <User u'peter'>]
限制返回用户的数量:
>>> User.query.limit(1).all()
[<User u'admin'>]
用主键查询用户:
>>> User.query.get(1)
<User u'admin'>
在视图中使用
使用 get_or_404() 来代替 get(),使用 first_or_404() 来代替 first()。 这样会抛出一个 404 错误,而不是返回 None:
@app.route('/user/<username>')
def show_user(username):
user = User.query.filter_by(username=username).first_or_404()
return render_template('show_user.html', user=user)
主要就是这样,最好看完整的文档
具体例子可以查看GitHub