Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
1.1k views
in Technique[技术] by (71.8m points)

python - Renaming spark output csv in azure blob storage

I have a Databricks notebook setup that works as the following;

  • pyspark connection details to Blob storage account
  • Read file through spark dataframe
  • convert to pandas Df
  • data modelling on pandas Df
  • convert to spark Df
  • write to blob storage in single file

My problem is, that you can not name the file output file, where I need a static csv filename.

Is there way to rename this in pyspark?

## Blob Storage account information
storage_account_name = ""
storage_account_access_key = ""

## File location and File type
file_location = "path/.blob.core.windows.net/Databricks_Files/input"
file_location_new = "path/.blob.core.windows.net/Databricks_Files/out"
file_type = "csv"

## Connection string to connect to blob storage
spark.conf.set(
  "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
  storage_account_access_key)

Followed by outputting file after data transformation

dfspark.coalesce(1).write.format('com.databricks.spark.csv') 
  .mode('overwrite').option("header", "true").save(file_location_new)

Where the file is then write as "part-00000-tid-336943946930983.....csv"

Where as a the goal is to have "Output.csv"

Another approach I looked at was just recreating this in python but have not come across in the documentation yet of how to output the file back to blob storage.

I know the method to retrieve from Blob storage is .get_blob_to_path via microsoft.docs

Any help here is greatly appreciated.

See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

The coalesce and repartition do not help with saving the dataframe into 1 normally named file.

I ended up just renaming the 1 csv file and deleting the folder with log:

def save_csv(df, location, filename):
  outputPath = os.path.join(location, filename + '_temp.csv')

  df.repartition(1).write.format("com.databricks.spark.csv").mode("overwrite").options(header="true", inferSchema="true").option("delimiter", "").save(outputPath)

  csv_files = os.listdir(os.path.join('/dbfs', outputPath))

  # moving the parquet-like temp csv file into normally named one
  for file in csv_files:
    if file[-4:] == '.csv':
      dbutils.fs.mv(os.path.join(outputPath,file) , os.path.join(location, filename))
      dbutils.fs.rm(outputPath, True)

# using save_csv
save_csv_location = 'mnt/.....'
save_csv(df, save_csv_location, 'name.csv')

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...