Sign up to take part
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Registered users can ask their own questions, contribute to discussions, and be part of the Community!
Hi,
I have a Streamlit Code Studio web-app running that generates dataframes. At the end of the web-app UI, a SUBMIT button is clicked and a function is run to do the following:
1. Duplicate an existing project template (works)
2. Set variables for the duplicated project (works)
3. Create a schema to write the dataframes' data to tables (doesn't work)
The code attempts to write the dataframes but returns this error (same exact error when using the 'write_dataframes' or the 'write_with_schema' function):
Exception: Error : [Errno 32] Broken pipe
Traceback:
File "/home/dataiku/workspace/code_studio-versioned/streamlit/admin.py", line 146, in _create_yrr_instance
app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)File "/opt/dataiku/python/dataiku/core/dataset.py", line 1071, in write_dataframe
writer.write_dataframe(df)File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 600, in __exit__
self.close()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 592, in close
self.remote_writer.flush()File "/opt/dataiku/python/dataiku/core/dataset_write.py", line 159, in flush
raise Exception(self.error_message)
When I check the duplicated project, the dataset is created but is empty. The schema is correct, the pre-write statement is correct, just no data. Code is below, thx much.
def _create_yrr_instance(user,
tmp_psf,
start_date,
end_date,
tmp_tx_included_lots,
tmp_tx_excluded_lots=None,
stage='DEV',
overwrite=False):
start_date = start_date.isoformat().replace('-', '_')
end_date = end_date.isoformat().replace('-', '_')
now = datetime.now().isoformat(' ', 'seconds')
client = dataiku.api_client()
yrr_template_project = None
try:
yrr_template_project = client.get_project("YRR_TEMPLATE")
except:
alert_user(
msg="Could not find the YRR_TEMPLATE project, this is a bug, report it to the developers!",
level='error'
)
return
project_keys = client.list_project_keys()
excluded_lots = None if tmp_tx_excluded_lots is None else tmp_tx_excluded_lots.LOT_ID.tolist()
excluded_lot_id = '' if excluded_lots is None else ','.join(excluded_lots)
excluded_lot_hash_id = excluded_lot_id if not excluded_lot_id else f"_{hashlib.md5(excluded_lot_id.encode('utf-8', 'ignore')).hexdigest()}"
id = f"YRR_{stage}_{tmp_psf}_{start_date}_{end_date}{excluded_lot_hash_id}"
if id in project_keys:
if overwrite:
# Delete an existing project with the same name
project = client.get_project(id)
project.delete()
else:
alert_user(
msg=f"Cannot create YRR instance, a project with the same ID '{id}' exists, set the 'overwrite' argument to True",
level='error'
)
return
copy_result = None
try:
copy_result = yrr_template_project.duplicate(target_project_key=id,
target_project_name=id)
except Exception as e:
alert_user(
msg="Could not duplicate the YRR_TEMPLATE project, this is a bug, report it to the developers!",
level='error'
)
return
copied_project = None
try:
copied_project = client.get_project(copy_result['targetProjectKey'])
except:
alert_user(
msg="Could not get the project handle for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
variables = None
try:
variables = copied_project.get_variables()
except:
alert_user(
msg="Could not get variables for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
schema_name = id.replace(f"YRR_{stage}_", '')
user_role = f"YRR_USER_{stage}"
variables["standard"]["snowflake_db"] = f"YRR_{stage}"
variables["standard"]["snowflake_role"] = user_role
variables["standard"]["snowflake_wh"] = "XSMALL_USER_WH"
variables["standard"]["snowflake_schema"] = schema_name
variables["standard"]["snowflake_schema_path"] = f"YRR_{stage}.{schema_name}"
variables["standard"]["tmp_psf"] = tmp_psf
variables["standard"]["created_on"] = now
variables["standard"]["initiator"] = user()
variables["standard"]["tmp_tx_start_date"] = start_date
variables["standard"]["tmp_tx_end_date"] = end_date
try:
copied_project.set_variables(variables)
except Exception as e:
alert_user(
msg="Could not set variables for the duplicated YRR project, this is a bug, report it to the developers!",
level='error'
)
return
# Create the datasets for the selected and excluded TMP lots
params = {'connection': 'GOQ_SNOWFLAKE_VAR', 'mode': 'table', 'table': 'APP_INPUT_SELECTED_TMP_LOTS', 'catalog': '${snowflake_db}', 'schema': '${snowflake_schema}', 'tableCreationMode': 'auto'}
dataset_handle = None
try:
dataset_handle = copied_project.create_dataset('app_input_selected_tmp_lots', type='Snowflake', params=params)
except:
st.write('create dataset failed')
return
ds_def = None
try:
ds_def = dataset_handle.get_definition()
except:
st.write('get dataset def failed')
return
ds_def['managed'] = True
try:
dataset_handle.set_definition(ds_def)
except:
st.write('set dataset def failed')
return
# Setup the pre-write SQL statements
settings = None
try:
settings = dataset_handle.get_settings()
except Exception as e:
st.write(e)
st.write('getting dataset settings failed')
return
pre_write_statements = "\n".join([
f"create schema if not exists ${{snowflake_db}}.${{snowflake_schema}};"
])
settings.get_raw()["params"]["customPreWriteStatements"] = pre_write_statements
try:
settings.save()
except Exception as e:
st.write(e)
st.write('saving dataset settings failed')
return
app_input_selected_tmp_lots = None
try:
app_input_selected_tmp_lots = dataiku.Dataset("app_input_selected_tmp_lots", project_key=id, ignore_flow=True)
except exception as e:
st.write(e)
st.write('get dataset write handle failed')
return
# Combine the selected lots into a single dataframe
dataframes = tmp_tx_included_lots.values()
combined_selected_df = pd.concat(dataframes, axis=0, ignore_index=True)
try:
#app_input_selected_tmp_lots.write_dataframe(combined_selected_df, infer_schema=True)
app_input_selected_tmp_lots.write_with_schema(combined_selected_df)
except Exception as e:
st.write(e)
st.write('write to dataset failed')
return
return id
Operating system used: Windows 10
Operating system used: Windows 10
I have verified that the pre-write statement is not working to create the schema before trying to write the dataframe to the Snowflake table. I altered my code to use an internal library to write the schema manually and then the dataset was created successfully.