SystemError - unknown opcode
SystemError - unknown opcode#
This normally arises when using Dask on a cluster (it will not happen locally). The error will look something like this.
SystemError: unknown opcode
The most common cause is an inconsistent environment between local and the cluster. The local versions of Python, dask, and distributed all need to be aligned because code is serialized with cloudpickle on the local side before it is sent to the cluster. This code is then unpickled and the deserialization will be inconsistent if the Python versions are inconsistent.
This Github issue replicates the issue by using a nested function. We can test this is by running the following code on the Dask cluster. When using the code snippet below, configure the Client to point to your Dask cluster.
from dask.distributed import Client
# insert your client here
client = Client()
from time import sleep
import numpy as np
from fugue import transform
import pandas as pd
import dask.dataframe as dd
def wrap():
# schema: *,x:int
def ppp(df:pd.DataFrame) -> pd.DataFrame:
sleep(2)
return df.assign(x=2)
n=10000000
df = pd.DataFrame(dict(
a=np.random.rand(n),
b=np.random.rand(n)
))
ddf = dd.from_pandas(df, npartitions=16)
return transform(ddf, ppp, engine="dask").compute()
This will give a traceback similar to the following.
SystemError Traceback (most recent call last)
<ipython-input-6-856eb59918a9> in <module>
----> 1 wrap()
<ipython-input-5-36bd46412ed1> in wrap()
22 ddf = dd.from_pandas(df, npartitions=16)
23
---> 24 return transform(ddf, ppp, engine="dask").compute()
/usr/local/lib/python3.9/site-packages/dask/base.py in compute(self, **kwargs)
286 dask.base.compute
287 """
--> 288 (result,) = compute(self, traverse=False, **kwargs)
289 return result
290
/usr/local/lib/python3.9/site-packages/dask/base.py in compute(*args, **kwargs)
568 postcomputes.append(x.__dask_postcompute__())
569
--> 570 results = schedule(dsk, keys, **kwargs)
571 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
572
/usr/local/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2691 should_rejoin = False
2692 try:
-> 2693 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2694 finally:
2695 for f in futures.values():
/usr/local/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1967 else:
1968 local_worker = None
-> 1969 return self.sync(
1970 self._gather,
1971 futures,
/usr/local/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
863 return future
864 else:
--> 865 return sync(
866 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
867 )
/usr/local/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
325 if error[0]:
326 typ, exc, tb = error[0]
--> 327 raise exc.with_traceback(tb)
328 else:
329 return result[0]
/usr/local/lib/python3.9/site-packages/distributed/utils.py in f()
308 if callback_timeout is not None:
309 future = asyncio.wait_for(future, callback_timeout)
--> 310 result[0] = yield future
311 except Exception:
312 error[0] = sys.exc_info()
/usr/local/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
/usr/local/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1832 exc = CancelledError(key)
1833 else:
-> 1834 raise exception.with_traceback(traceback)
1835 raise exc
1836 if errors == "skip":
/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/optimization.py in __call__()
/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/core.py in get()
/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/core.py in _execute_task()
/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/utils.py in apply()
/opt/conda/envs/coiled/lib/python3.8/site-packages/dask/dataframe/core.py in apply_and_enforce()
/usr/local/lib/python3.9/site-packages/fugue_dask/execution_engine.py in _map()
196 pdf.reset_index(drop=True), input_schema, pandas_df_wrapper=True
197 )
--> 198 if on_init_once is not None:
199 on_init_once(0, input_df)
200 cursor = partition_spec.get_cursor(input_schema, 0)
SystemError: unknown opcode
In the traceback above, notice there are two versions of Python. Dask requires consistent Python versions between the client and cluster.