SystemError - unknown opcode#

This normally arises when using Dask on a cluster (it will not happen locally). The error will look something like this.

SystemError: unknown opcode

The most common cause is an inconsistent environment between local and the cluster. The local versions of Python, dask, and distributed all need to be aligned because code is serialized with cloudpickle on the local side before it is sent to the cluster. This code is then unpickled and the deserialization will be inconsistent if the Python versions are inconsistent.

This Github issue replicates the issue by using a nested function. We can test this is by running the following code on the Dask cluster. When using the code snippet below, configure the Client to point to your Dask cluster.

from dask.distributed import Client

# insert your client here
client = Client()

from time import sleep
import numpy as np
from fugue import transform
import pandas as pd
import dask.dataframe as dd

def wrap():
    # schema: *,x:int
    def ppp(df:pd.DataFrame) -> pd.DataFrame:
        sleep(2)
        return df.assign(x=2)
    
    n=10000000
    df = pd.DataFrame(dict(
        a=np.random.rand(n),
        b=np.random.rand(n)
    ))
    ddf = dd.from_pandas(df, npartitions=16)
    
    return transform(ddf, ppp, engine="dask").compute()

This will give a traceback similar to the following.

SystemError                               Traceback (most recent call last)
<ipython-input-6-856eb59918a9> in <module>
----> 1 wrap()

<ipython-input-5-36bd46412ed1> in wrap()
     22     ddf = dd.from_pandas(df, npartitions=16)
     23 
---> 24     return transform(ddf, ppp, engine="dask").compute()

/usr/local/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
    286         dask.base.compute
    287         """
--> 288         (result,) = compute(self, traverse=False, **kwargs)
    289         return result
    290 

/usr/local/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
    568         postcomputes.append(x.__dask_postcompute__())
    569 
--> 570     results = schedule(dsk, keys, **kwargs)
    571     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    572 

/usr/local/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2691                     should_rejoin = False
   2692             try:
-> 2693                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2694             finally:
   2695                 for f in futures.values():

/usr/local/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1967             else:
   1968                 local_worker = None
-> 1969             return self.sync(
   1970                 self._gather,
   1971                 futures,

/usr/local/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    863             return future
    864         else:
--> 865             return sync(
    866                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    867             )

/usr/local/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    325     if error[0]:
    326         typ, exc, tb = error[0]
--> 327         raise exc.with_traceback(tb)
    328     else:
    329         return result[0]

/usr/local/lib/python3.9/site-packages/distributed/utils.py in f()
    308             if callback_timeout is not None:
    309                 future = asyncio.wait_for(future, callback_timeout)
--> 310             result[0] = yield future
    311         except Exception:
    312             error[0] = sys.exc_info()

/usr/local/lib/python3.9/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/local/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1832                             exc = CancelledError(key)
   1833                         else:
-> 1834                             raise exception.with_traceback(traceback)
   1835                         raise exc
   1836                     if errors == "skip":

/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/optimization.py in __call__()

/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/core.py in get()

/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/core.py in _execute_task()

/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/utils.py in apply()

/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()

/usr/local/lib/python3.9/site-packages/fugue_dask/execution_engine.py in _map()
    196                 pdf.reset_index(drop=True), input_schema, pandas_df_wrapper=True
    197             )
--> 198             if on_init_once is not None:
    199                 on_init_once(0, input_df)
    200             cursor = partition_spec.get_cursor(input_schema, 0)

SystemError: unknown opcode

In the traceback above, notice there are two versions of Python. Dask requires consistent Python versions between the client and cluster.