petitviolet blog

    SQLAlchemyとmultiprocessingを使う

    2014-01-17

    QiitaPython

    Python の便利なマルチプロセスライブラリである multiprocessing と SQLAlchemy を組み合わせて使用するサンプル プロセス間のデータのやり取りは Queue で行い、DB には SQLALchemy の session を使ってアクセスする

    from multiprocessing import (Process, Queue)
    from sqlalchemy import (create_engine, MetaData)
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy.orm import (sessionmaker, scoped_session)
    
    
    engine = create_engine("mysql://{user}:{passwd}@{host}/{db}")
    # _Session = scoped_session(sessionmaker())
    Session = sessionmaker()
    metadata = MetaData(engine)
    Base = declarative_base()
    # Base.query = _Session.query_property()
    
    ####################
    # テーブル定義は省略(Itemテーブル)
    ####################
    
    class Hoge(object):
        def __init__(self):
            '''マルチプロセスに食わせるキューと吐き出す先のキュー'''
            self.in_queue = Queue()
            self.out_queue = Queue()
            self.session = Session()
    
        def __del__(self):
            self.session.commit()
            self.session.close()
    
        def get_items(self, worker_num=4):
            '''マルチプロセスでDBからデータを取得する
            キューに取得したいidを格納してマルチプロセスでDBにアクセスし、取り出す
            '''
            item_ids = self.session.query(Item.id).all()
    
            for item_id in item_ids:
                self.in_queue.put(item_id[0])
    
            jobs = []
            for i in xrange(worker_num):
                p = Process(target=self.worker, args=[])
                p.daemon = True
                jobs.append(p)
                p.start()
    
            for job in jobs:
                job.join()
                # プロセス終了メッセージ
                print '{name}.exitcode = {code}'.format(name=job.name, code=job.exitcode)
    
            return True
    
        def worker(self):
            '''マルチプロセス用ワーカー
            self.in_queueからidを取り出し、DBからそのデータを取得してself.out_queueに格納する
            '''
            while not self.in_queue.empty():
                id = self.in_queue.get()
                print id,
                try:
                    item = self.session.query(Item).filter(Item.id == id).first()
                except Exception as e:
                    print 'error =>', e
                    continue
                self.out_queue.put(item.name)
                print 'in_queue: {0} (out_queue: {1})'\
                        .format(self.in_queue.qsize(), self.out_queue.qsize())
            return True  # これがないと終わらない
    

    注意点

    ここで scoped_session で Session を作成すると

    Could not locate column in row for column …

    といったエラーが吐かれてしまうことがあるので注意

    また、

    Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/queues.py", line 266, in _feed send(obj) IOError: [Errno 32] Broken pipe

    というエラーも出たりしましたが、再現されなかったのでよく分かりませんでした…。

    from: https://qiita.com/petitviolet/items/64965e9ba236ebbdd37f