SQLAlchemyとmultiprocessingを使う
2014-01-17
QiitaPythonPython の便利なマルチプロセスライブラリである multiprocessing と SQLAlchemy を組み合わせて使用するサンプル プロセス間のデータのやり取りは Queue で行い、DB には SQLALchemy の session を使ってアクセスする
from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)
engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()
####################
# テーブル定義は省略(Itemテーブル)
####################
class Hoge(object):
def __init__(self):
'''マルチプロセスに食わせるキューと吐き出す先のキュー'''
self.in_queue = Queue()
self.out_queue = Queue()
self.session = Session()
def __del__(self):
self.session.commit()
self.session.close()
def get_items(self, worker_num=4):
'''マルチプロセスでDBからデータを取得する
キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
'''
item_ids = self.session.query(Item.id).all()
for item_id in item_ids:
self.in_queue.put(item_id[0])
jobs = []
for i in xrange(worker_num):
p = Process(target=self.worker, args=[])
p.daemon = True
jobs.append(p)
p.start()
for job in jobs:
job.join()
# プロセス終了メッセージ
print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
return True
def worker(self):
'''マルチプロセス用ワーカー
self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
'''
while not self.in_queue.empty():
id = self.in_queue.get()
print id,
try:
item = self.session.query(Item).filter(Item.id == id).first()
except Exception as e:
print 'error =>', e
continue
self.out_queue.put(item.name)
print 'in_queue: {0} (out_queue: {1})'\
.format(self.in_queue.qsize(), self.out_queue.qsize())
return True # これがないと終わらない
注意点
ここで scoped_session で Session を作成すると
Could not locate column in row for column …
といったエラーが吐かれてしまうことがあるので注意
また、
Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe
というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。
from: https://qiita.com/petitviolet/items/64965e9ba236ebbdd37f