blog.petitviolet.net

SQLAlchemyとmultiprocessingを使う

2014-01-17

QiitaPython

Python の便利なマルチプロセスライブラリである multiprocessing と SQLAlchemy を組み合わせて使用するサンプル プロセス間のデータのやり取りは Queue で行い、DB には SQLALchemy の session を使ってアクセスする

from multiprocessing import (Process, Queue)
from sqlalchemy import (create_engine, MetaData)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import (sessionmaker, scoped_session)


engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
# _Session = scoped_session(sessionmaker())
Session = sessionmaker()
metadata = MetaData(engine)
Base = declarative_base()
# Base.query = _Session.query_property()

####################
# テーブル定義は省略(Itemテーブル)
####################

class Hoge(object):
    def __init__(self):
        '''マルチプロセスに食わせるキューと吐き出す先のキュー'''
        self.in_queue = Queue()
        self.out_queue = Queue()
        self.session = Session()

    def __del__(self):
        self.session.commit()
        self.session.close()

    def get_items(self, worker_num=4):
        '''マルチプロセスでDBからデータを取得する
        キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
        '''
        item_ids = self.session.query(Item.id).all()

        for item_id in item_ids:
            self.in_queue.put(item_id[0])

        jobs = []
        for i in xrange(worker_num):
            p = Process(target=self.worker, args=[])
            p.daemon = True
            jobs.append(p)
            p.start()

        for job in jobs:
            job.join()
            # プロセス終了メッセージ
            print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)

        return True

    def worker(self):
        '''マルチプロセス用ワーカー
        self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
        '''
        while not self.in_queue.empty():
            id = self.in_queue.get()
            print id,
            try:
                item = self.session.query(Item).filter(Item.id == id).first()
            except Exception as e:
                print 'error =>', e
                continue
            self.out_queue.put(item.name)
            print 'in_queue: {0} (out_queue: {1})'\
                    .format(self.in_queue.qsize(), self.out_queue.qsize())
        return True  # これがないと終わらない

注意点

ここで scoped_session で Session を作成すると

Could not locate column in row for column …

といったエラーが吐かれてしまうことがあるので注意

また、

Traceback (most recent call last): File “/usr/lib/python2.7/multiprocessing/queues.py”, line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe

というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。

from: https://qiita.com/petitviolet/items/64965e9ba236ebbdd37f