2 years ago

#71665

test-img

froy001

Python multiprocessing, can't pickle thread.lock (pymongo.Cursor)

First, let me assure you I read all the relevant answers and they don't work for me.

I am using multiprocessing Pool to parallelize my data creation. I am using Mongodb 5.0 and pymongo client.

As you can see I am initializing the mongo client in the worker as suggested by the available answers but still I get a :

TypeError: cannot pickle '_thread.lock' object
Exception ignored in: <function CommandCursor.__del__ at 0x7f96f6fff160>

Is there a way I can use multiprocessing with pymongo.Cursor ?? Any help will be appreciated

This is the function that calls the Pool

def get_all_valid_events(
        event_criteria:str,
        all_listings:List[str],
        earnings:List[Dict[str,Any]],
        days_around_earnings=0,
        debug=False,
        poolsize=10,
        chunk_size=100,
        lookback=30,
        lookahead = 0
):
    start = time.perf_counter()
    listings = Manager().list(all_listings.copy())
    valid_events = []
    if debug:
        for i in range(ceil(len(listings)/chunk_size)):
            valid_events +=  get_valid_event_dates_by_listing(event_criteria,listings[i*chunk_size:(i+1)*chunk_size] , earnings, days_around_earnings,debug)
    else:
        payload = list()
        for i in range(ceil(len(listings)/chunk_size)):
            payload.append(
                [
                    event_criteria,
                    listings[i*chunk_size:(i+1)*chunk_size],
                    earnings,
                    days_around_earnings,
                    debug,
                    lookback,
                    lookahead
                ]
            )

        with ThreadPool(poolsize) as pool:
            valid_events = pool.starmap(get_valid_event_dates_by_listing, payload)

    print(f"getting all valid true events took {time.perf_counter() - start} sec")

    return valid_events

And this is the worker function:

def get_valid_event_dates_by_listing(
        event_criteria:str,
        listings:List[str],
        earnings_list,
        days_around_earnings=0,
        debug=False,
        lookback=30,
        lookahead=0
) -> List[Tuple[Tuple[str, datetime], int]]:

    #TODO: generalize event filter
    start = time.perf_counter()
    client = MongoClient()
    db = client['stock_signals']
    cursor_candles_by_listing = db.candles.find(
        {'listing': {'$in': listings}},
        {'_id':0, 'listing':1, 'date':1,'position':1, 'PD_BBANDS_6_lower':1, 'close':1, 'PD_BBANDS_6_upper':1}
    )
    candles = list(cursor_candles_by_listing)
    df = pd.DataFrame(candles).dropna()
    minimum_position_dict = dict(df.groupby('listing').min()['position']) # We need the minimum position by listing to filter only events that have lookback

    # Filter only the dates that satisfy the criteria
    lte_previous_bb_6_lower = df['close'] <= df[f"{event_criteria}_lower"].shift()
    gte_previous_bb_6_upper = df['close'] >= df[f"{event_criteria}_upper"].shift()

    potential_true_events_df = df[lte_previous_bb_6_lower | gte_previous_bb_6_upper]
    potential_false_events_df = df.drop(potential_true_events_df.index)
    potential_true_event_dates = potential_true_events_df[['listing', 'date', 'position']].values
    actual_true_event_dates = earning_helpers.filter_event_dates_by_earnings_and_position(potential_true_event_dates, earnings_list, minimum_position_dict ,days_around_earning=days_around_earnings, lookback=lookback)
    true_event_dates = [((event_date[0], event_date[1], event_date[2]), 1) for event_date in actual_true_event_dates]

    potential_false_event_dates = potential_false_events_df[['listing', 'date', 'position']].values
    actual_false_event_dates = _random_false_events_from_listing_df(potential_false_event_dates, len(actual_true_event_dates), earnings_list, minimum_position_dict, days_around_earnings,lookback)
    false_events_dates = [((event_date[0], event_date[1], event_date[2]), 0) for event_date in actual_false_event_dates]
    all_event_dates = true_event_dates + false_events_dates
    shuffle(all_event_dates)
    print(f"getting a true sequence for listing took {time.perf_counter() - start} sec")
    return all_event_dates

And this is my main

from utils import event_helpers, earning_helpers
from utils.queries import get_candle_listing

if __name__ == "__main__":
    all_listings = get_candle_listing.get_listings()
    earnigns = earning_helpers.get_all_earnings_dates()

    res = event_helpers.get_all_valid_events('PD_BBANDS_6', all_listings, earnigns, 2, chunk_size=100)

Full Stack Trace

 File "test_multiprocess.py", line 8, in <module>
    res = event_helpers.get_all_valid_events('PD_BBANDS_6', all_listings, earnigns, 2, chunk_size=100)
  File "/media/data/projects/ml/signal_platform/utils/event_helpers.py", line 53, in get_all_valid_events
    valid_events = pool.starmap(get_valid_event_dates_by_listing, payload)
  File "/home/froy001/.asdf/installs/python/3.8.12/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/froy001/.asdf/installs/python/3.8.12/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "/home/froy001/.asdf/installs/python/3.8.12/lib/python3.8/multiprocessing/pool.py", line 537, in _handle_tasks
    put(task)
  File "/home/froy001/.asdf/installs/python/3.8.12/lib/python3.8/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/home/froy001/.asdf/installs/python/3.8.12/lib/python3.8/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object
Exception ignored in: <function CommandCursor.__del__ at 0x7f46e91e21f0>
Traceback (most recent call last):
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/command_cursor.py", line 68, in __del__
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/command_cursor.py", line 83, in __die
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/mongo_client.py", line 1696, in _cleanup_cursor
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/client_session.py", line 466, in _end_session
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/client_session.py", line 871, in in_transaction
  File "/home/froy001/.cache/pypoetry/virtualenvs/signal-platform-31MTNyCe-py3.8/lib/python3.8/site-packages/pymongo/client_session.py", line 362, in active
AttributeError: 'NoneType' object has no attribute 'STARTING'

Update: 01-23

I tried using the multiprocess library using dill but it didn't help

mongodb

pymongo

python-multiprocessing

0 Answers

Your Answer

Accepted video resources