I'm able to establish a connection to my Databricks FileStore DBFS
and access the filestore.
Reading, writing, and transforming data with Pyspark is possible but when I try to use a local Python API such as pathlib
or the OS
module I am unable to get past the first level of the DBFS file system
I can use a magic command:
%fs ls dbfs:\mnt\my_fs\...
which works perfectly and lists all the child directories?
but if I do os.listdir('\dbfs\mnt\my_fs\')
it returns ['mount.err']
as a return value
I've tested this on a new cluster and the result is the same
I'm using Python on a Databricks Runtine Version 6.1 with Apache Spark 2.4.4
is anyone able to advise.
Connection Script :
I've used the Databricks CLI library to store my credentials which are formatted according to the databricks documentation:
def initialise_connection(secrets_func):
configs = secrets_func()
# Check if the mount exists
bMountExists = False
for item in dbutils.fs.ls("/mnt/"):
if str(item.name) == r"WFM/":
bMountExists = True
# drop if exists to refresh credentials
if bMountExists:
dbutils.fs.unmount("/mnt/WFM")
bMountExists = False
# Mount a drive
if not (bMountExists):
dbutils.fs.mount(
source="adl://test.azuredatalakestore.net/WFM",
mount_point="/mnt/WFM",
extra_configs=configs
)
print("Drive mounted")
else:
print("Drive already mounted")
We experienced this issue when the same container was mounted to two different paths in the workspace. Unmounting all and remounting resolved our issue. We were using Databricks version 6.2 (Spark 2.4.4, Scala 2.11). Our blob store container config:
Notebook script to run to unmount all mounts in /mnt
:
# Iterate through all mounts and unmount
print('Unmounting all mounts beginning with /mnt/')
dbutils.fs.mounts()
for mount in dbutils.fs.mounts():
if mount.mountPoint.startswith('/mnt/'):
dbutils.fs.unmount(mount.mountPoint)
# Re-list all mount points
print('Re-listing all mounts')
dbutils.fs.mounts()
Assuming you have a separate process to create the mounts. Create job definition (job.json
) to run Python script on automated cluster:
{
"name": "Minimal Job",
"new_cluster": {
"spark_version": "6.2.x-scala2.11",
"spark_conf": {},
"node_type_id": "Standard_F8s",
"driver_node_type_id": "Standard_F8s",
"num_workers": 2,
"enable_elastic_disk": true,
"spark_env_vars": {
"PYSPARK_PYTHON": "/databricks/python3/bin/python3"
}
},
"timeout_seconds": 14400,
"max_retries": 0,
"spark_python_task": {
"python_file": "dbfs:/minimal/job.py"
}
}
Python file (job.py
) to print out mounts:
import os
path_mounts = '/dbfs/mnt/'
print(f"Listing contents of {path_mounts}:")
print(os.listdir(path_mounts))
path_mount = path_mounts + 'YOURCONTAINERNAME'
print(f"Listing contents of {path_mount }:")
print(os.listdir(path_mount))
Run databricks CLI commands to run job. View Spark Driver logs for output, confirming that mount.err
does not exist.
databricks fs mkdirs dbfs:/minimal
databricks fs cp job.py dbfs:/minimal/job.py --overwrite
databricks jobs create --json-file job.json
databricks jobs run-now --job-id <JOBID FROM LAST COMMAND>
We have experienced the same issue when connecting to the an Azure Generation2 storage account (without hierarchical name spaces).
The error seems to occur when switching the Databricks Runtime Environment from 5.5 to 6.x. However, we have not been able to pinpoint the exact reason for this. We assume some functionality might have been deprecated.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With