Below are a collection of solutions relating to interacting with folders and files on a local file system. If you search for these topics, you will get many possible solutions.   I tried to pick the most robust, fastest, and memory efficient options.
# Written by: Mark W Kiehl
# http://mechatronicsolutionsllc.com/
# http://www.savvysolutions.info/savvycodesolutions/
# File system solutions/examples:
# Get the current script folder.
# Define a file from a specific drive and path on the Windows OS and get information about it.
# Get the path to the user's home directory.
# Determine what the operating system (OS) is.
# Create a temporary subfolder within the OS temporary folder, create a new temporary file, delete both the temporary file and subfolder.
# Get a random subfolder from a folder.
# Get a random file from a folder.
# Get all of the subfolders within a folder
# Get all of the subfolders within a folder recursively
# Get all of the files in folder
# Get all of the files in all subfolders from within a folder
# Write, read, append to a CSV file
# Write & read to a Pickle file
# Write and read to a JSON file
# Write and read to INI file
# MIT License
#
# Copyright (c) 2024 Mechatronic Solutions LLC
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
__version__ = "0.0.1"
# 0.0.0 First release of get_folders_or_files(), and examples on file IO.
# 0.0.1 Write, read, append to a CSV file.
# Write and read to a Pickle file.
# Write and read to a Parquet file.
# Use DuckDB as a transformation engine to read multiple CSV files and save the data to a Parquet file.
from pathlib import Path
print("'" + Path(__file__).stem + ".py' v" + __version__)
"""
# Configure logging (optional)
import os.path
from pathlib import Path
import logging
logging.basicConfig(filename=Path(Path.cwd()).joinpath(os.path.basename(__file__).split('.')[0]+".log"), encoding='utf-8', level=logging.DEBUG, format='%(asctime)s %(message)s')
logging.info("Script start..") # Use logging.info("") to add something to the log
"""
def get_folders_or_files(folder=None, return_folders=True, recursive=False, show_errors=True, verbose=False):
"""
Returns a list all folders or files for folder.
Argument folder can be string, Path, or os.DirEntry.
return_folders=True will return the subfolders of folder.
recursive=True will recursively search all subfolders of folder.
"""
from pathlib import Path
import os
if isinstance(folder, str): folder = Path(str)
if isinstance(folder, os.DirEntry) or isinstance(folder, Path):
pass
else:
raise Exception("Unknown type of " + str(type(folder)) + " passed to " + get_folders_or_files.__name__ + ": " + str(folder))
if not folder.is_dir(): raise Exception("Folder does not exist: " + str(folder))
def get_files_scandir(folder=None, show_errors=True, verbose=False):
# Recursively searches folder for files and returns a list of them as Path.
# Folder may be of type str, Path, or os.DirEntry
# Fast!!!
files = []
try:
with os.scandir(folder) as entries:
for entry in entries:
if entry.is_file():
files.append(Path(entry))
else:
items = get_files_scandir(entry)
for item in items: files.append(item)
except Exception as e:
if show_errors: print(e)
return files
def get_folders_scandir(folder=None, show_errors=True, verbose=False):
# Recursively searches folder for folders and returns a list of them as Path.
# Argument folder may be of type str, Path, or os.DirEntry
# Fast!!!
folders = []
try:
with os.scandir(folder) as entries:
for entry in entries:
if entry.is_dir():
folders.append(Path(entry))
items = get_folders_scandir(entry)
for item in items: folders.append(item)
except Exception as e:
if show_errors: print(e)
return folders
if folder is None: raise Exception("Argument 'folder' was not passed to the fn " + get_folders_or_files.__name__ + "() .")
files_or_folders = []
if return_folders == True:
if recursive == False:
files_or_folders = [f for f in folder.iterdir() if f.is_dir()] # # Gets only the subfolders in folder, not recursive subfolder search
else: # recursive == True
files_or_folders = get_folders_scandir(folder=folder, show_errors=show_errors)
# Below method takes 7x longer
#for path in folder.rglob("*"):
# if path.is_dir():
# if verbose: print(str(path))
# files_or_folders.append(path)
else: # return files
if recursive == False:
files_or_folders = [f for f in folder.iterdir() if f.is_file()] # Gets only the files in folder, not the subfolders
else: # recursive == True
files_or_folders = get_files_scandir(folder=folder, show_errors=show_errors)
# Below method takes 7x longer
#for file in folder.rglob("*"):
# if file.is_file():
# if verbose: print(str(file))
# files_or_folders.append(file)
return files_or_folders
if __name__ == '__main__':
pass
"""
# Get the current script folder
from pathlib import Path
folder = Path(Path.cwd())
if not folder.is_dir(): raise Exception("Folder doesn't exist or other error: " + str(folder))
print("Current script folder: " + str(folder))
# To get a known subfolder: folder = folder.joinpath("subfolder_name")
"""
"""
# Define a file from a specific drive and path on the Windows OS
# and get information about it
import os.path
from pathlib import Path
# Note below how the backslash is escaped with a backslash, e.g. \\
# The path below is from a PC running Windows OS.
file = Path("F:\\archive\\Music\\music_library_master\\Sia\\05 - Unstoppable.mp3")
print("file.exists(): ", file.exists()) # True if file or folder exists
print("file.is_dir(): ", file.is_dir()) # True if folder exists
print("file.is_file(): ", file.is_file()) # True if file exists
print("file.name: ", file.name) # full filename with extension
print("file.stem: ", file.stem) # filename only, without extension
print("file.suffix: ", file.suffix) # filename extension e.g. .mp3
print("file.parent:", file.parent) # The full parent path without the final backslash. F:\archive\Music\music_library_master\Sia
print("file.anchor: ", file.anchor) # drive and root e.g. F:\
print("file.with_name(): ", file.with_name("my_new_filename")) # renames the file. Returns full path with new filename
print("file.drive: ", file.drive) # drive e.g. F:
print("file.parts: ", file.parts) # Tuple of drive, parent folders, and filename ('F:\\', 'archive', 'Music', 'music_library_master', 'Sia', '05 - Unstoppable.mp3')
print("file.root: ", file.root) # \
print("file.suffixes: ", file.suffixes) # ['.mp3']
print("file.as_posix(): ", file.as_posix()) # String representation of path with foward slashes F:/archive/Music/music_library_master/Sia/05 - Unstoppable.mp3
print("file.as_uri(): ", file.as_uri()) # file:///F:/archive/Music/music_library_master/Sia/05%20-%20Unstoppable.mp3
print("file.is_block_device(): ", file.is_block_device()) # True if the file points to hardware that manages data in fixed-size segments called blocks.
print("file.is_char_device(): ", file.is_char_device()) # True if the file points to hardware that manages data as a stream of bytes.
"""
"""
# Get the path to the user's home directory
from pathlib import Path
folder_home = Path.home() # Returns the path to the user's home directory
print("folder_home: '" + str(folder_home) + "'")
# Get the user's Documents folder
folder_home = folder_home.joinpath("Documents")
print("folder_home: '" + str(folder_home) + "'")
"""
"""
# Determine what the operating system (OS) is
# Several options including os and sys, but the 'platform' library works well and also provides the OS version
import platform
print("platform.system():", platform.system())
# platform.system() returns: 'Linux', 'Darwin', 'Java', 'Windows'
print("platform.system():", platform.version()) # 10.0.22631
print("platform.system():", platform.release()) # 10
"""
"""
# Get the OS temporary folder and create a unique subfolder.
# Create a new temporary file in the new subfolder.
# Delete the temporary file, and then the temporary folder.
import os
from tempfile import gettempdir, TemporaryFile
from pathlib import Path
import uuid
from shutil import rmtree
# Show the OS temp folder
print("gettempdir():", gettempdir())
# Get a new subfolder under the OS temp folder that doesn't already exist
#tmp_folder = os.path.join(gettempdir(), '.{}'.format(hash(os.times())))
tmp_folder = os.path.join(gettempdir(), uuid.uuid4().hex)
print("tmp_folder:", tmp_folder)
tmp_folder = Path(tmp_folder)
if tmp_folder.is_dir(): raise Exception("Folder already exists: " + str(tmp_folder))
# Create the subfolder
os.makedirs(tmp_folder)
if not tmp_folder.is_dir(): raise Exception("Folder could not be created: ", tmp_folder)
# Create a new temporary file (that doesn't already exist)
tmp_file = tmp_folder.joinpath(uuid.uuid4().hex + ".csv")
print("tmp_file '" + tmp_file.name + "'")
# Now you can create the file
with open(tmp_file, 'w') as f:
f.write("id; num_int; num_float; str1")
lines = []
line = str(0) + ";" + str(32761) + ";" + str(1.234) + ";" + "abcde" + "\n"
lines.append(line)
line = str(1) + ";" + str(32762) + ";" + str(2.345) + ";" + "fghijk" + "\n"
lines.append(line)
f.writelines(lines)
# File is automatically closed at the end of the "with open() as f:" block.
# Make sure the file exists:
if not tmp_file.is_file(): raise Exception("tmp_file not created! " + str(tmp_file))
# When done with the file and folder, delete them..
# Delete the temporary file if it exists
tmp_file.unlink(missing_ok=True)
if tmp_file.is_file(): raise Exception("tmp_file could not be deleted! " + str(tmp_file))
# Delete the subfolder
rmtree(tmp_folder, ignore_errors=True)
if tmp_folder.is_dir(): raise Exception("Folder could not be deleted: ", tmp_folder)
print("The new temporary folder has been deleted: ", tmp_folder)
"""
"""
# Get a random subfolder from a folder
import os
import random
from pathlib import Path
folder = Path.home()
rnd_folder = folder.joinpath(random.choice([x for x in folder.iterdir() if x.is_dir()]))
if not rnd_folder.is_dir(): raise Exception("Not a folder that exists: '" + str(rnd_folder) + "'")
print("rnd_folder: '" + str(rnd_folder) + "'")
"""
"""
# Get a random file from a folder
import os
import random
from pathlib import Path
folder = Path.home()
rnd_file = folder.joinpath(random.choice([x for x in folder.iterdir() if x.is_file()]))
if not rnd_file.is_file(): raise Exception("Not a file that exists: '" + str(rnd_file) + "'")
print("rnd_file: '" + str(rnd_file) + "'")
"""
"""
# Get all of the subfolders within a folder
from pathlib import Path
folder = Path.home()
# Get all of the subfolders in the user's home directory (one level deep only, not recursively).
folders = get_folders_or_files(folder=folder, return_folders=True, recursive=False)
print(str(len(folders)) + " folders found in " + str(folder))
for path in folders:
print("\t'" + str(path) + "'")
#print("\t'" + path.name + "'")
"""
"""
# Get all of the subfolders within a folder recursively
import time
from pathlib import Path
t_start_sec = time.perf_counter()
folder = Path.home()
# Get all of the files in all subfolders in the user's home directory.
folders = get_folders_or_files(folder=folder, return_folders=True, recursive=True, show_errors=True)
print(str(len(folders)) + " folders found in " + str(folder))
t_end_sec = time.perf_counter()
print(str(round(t_end_sec-t_start_sec,1)) + " sec")
# Show some of the contents in folders
i = 0
for path in folders:
print("\t'" + str(path) + "'")
i += 1
if i > 5: break
# 100260 folders found in 11 sec
"""
"""
# Get all of the files in folder
from pathlib import Path
folder = Path.home()
# Get all of the files in the user's home directory (do not recursively search the subfolders).
files = get_folders_or_files(folder=folder, return_folders=False, recursive=False)
print(str(len(files)) + " files found in " + str(folder))
i = 0
for file in files:
print("\t'" + str(file) + "'")
"""
"""
# Get all of the files in all subfolders from within a folder
from pathlib import Path
folder = Path.home()
# Get all of the files in all subfolders in the user's home directory.
files = get_folders_or_files(folder=folder, return_folders=False, recursive=True, show_errors=True)
print(str(len(files)) + " files found in " + str(folder))
i = 0
for file in files:
print("\t'" + str(file) + "'")
i += 1
if i > 10: break
# 695682 files found in 70 sec (old method), now 11 to 15 sec
"""
# Write, read, append to a CSV file
"""
import csv
from pathlib import Path
path_file = Path(Path.cwd()).joinpath('junk.csv')
# Write CSV
rows = [['Garden Hollow Road Designated Dispersed Campsite 10', 'https://www.campendium.com/garden-hollow-road-campsite-10?source=search-results-list'], ['Gas Well Equestrian Camping Area', 'https://www.campendium.com/gas-well-equestrian-camping-area?source=search-results-list'], ['Spruce Run Road Designated Dispersed Campsite-5', 'https://www.campendium.com/spruce-run-road-campsite-5?source=search-results-list']]
header = ["name","url"]
with open(path_file, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
# Read CSV
with open(path_file, "r", newline='') as f:
csv_reader = csv.reader(f, delimiter=';')
for row in csv_reader:
# Each row read from the csv file is returned as a list of strings.
#print(', '.join(row))
print(row[0] + "; " + row[1])
# Append row to existing CSV
row = ['Blue Mountain Westbound Service Plaza', 'https://www.campendium.com/blue-mountain-service-plaza-westbound?source=search-results-list']
with open(path_file, "a", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write (append) the row
csv_writer.writerow(row)
"""
# Write & read to a Pickle file
"""
import pickle
from pathlib import Path
path_file = Path(Path.cwd()).joinpath('junk.pickle')
data = {}
data['name'] = 'Garden Hollow Road Designated Dispersed Campsite 10'
data['lat'] = 41.0275
data['lon'] = -77.1604
with open(path_file, 'wb') as f:
pickle.dump(data, f)
del data
with open(path_file, 'rb') as f:
data = pickle.load(f)
for key in data:
print(key, data[key], type(data[key]))
"""
# Write & read from a Parquet file.
"""
# A Parquet file is a standardized open-source columnar storage format
# for use in data analysis systems. The file size is small and
# and reading/writing is very fast. Metadata is also supported.
import csv
from pathlib import Path
# Write the CSV file
path_file_csv = Path(Path.cwd()).joinpath('time_series.csv')
rows = [
["2023-03-02 13:46:46",1677764806,32762,1.17549435e-38,False,"str,ABC"],
["2023-03-02 13:46:47",1677764807,-32761,-1.17549435e-38,True,""],
["2023-03-02 13:46:46",1677764808,-100,1.7287196056221583e+38,True,"str DEF"]
]
header = ["datetime","unix_s","int","float","bool1","text"]
with open(path_file_csv, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
# The CSV file looks like this:
# datetime;unix_s;int;float;bool1;text
# 2023-03-02 13:46:46;1677764806;32762;1.17549435e-38;False;str,ABC
# 2023-03-02 13:46:47;1677764807;-32761;-1.17549435e-38;True;
# 2023-03-02 13:46:46;1677764808;-100;1.7287196056221583e+38;True;str DEF
# Read the CSV file into memory using the PyArrow library
# pip install pyarrow
from pyarrow import parquet
from pyarrow import csv as pa_csv
parse_options = pa_csv.ParseOptions(delimiter=";")
table = pa_csv.read_csv(path_file_csv, parse_options=parse_options)
#print(type(table)) #
# Write the Parquet file
path_file_parquet = Path(Path.cwd()).joinpath('time_series.parquet')
parquet.write_table(table, path_file_parquet)
del table
# Read the Parquet file into memmory
#table = parquet.read_table(path_file_parquet) # To a PyArrow table
#table = parquet.read_table(path_file_parquet).to_pandas() # To a Pandas table
table = parquet.read_table(path_file_parquet).to_pydict() # To a Python dictionary
print("Parquet file " + str(path_file_parquet) + ":")
print(table)
"""
# Use DuckDB as a transformation engine to read a CSV file and save it to a Parquet file.
"""
import csv
from pathlib import Path
# Write the CSV file
path_file_csv = Path(Path.cwd()).joinpath('time_series.csv')
rows = [
["2023-03-02 13:46:46",1677764806,32762,1.17549435e-38,False,"str,ABC"],
["2023-03-02 13:46:47",1677764807,-32761,-1.17549435e-38,True,""],
["2023-03-02 13:46:48",1677764808,-100,1.7287196056221583e+38,True,"str DEF"]
]
header = ["datetime","unix_s","int","float","bool1","text"]
with open(path_file_csv, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
# The CSV file looks like this:
# datetime;unix_s;int;float;bool1;text
# 2023-03-02 13:46:46;1677764806;32762;1.17549435e-38;False;str,ABC
# 2023-03-02 13:46:47;1677764807;-32761;-1.17549435e-38;True;
# 2023-03-02 13:46:48;1677764808;-100;1.7287196056221583e+38;True;str DEF
# Read the CSV file with duckdb
import duckdb # pip install duckdb
# create a DuckDB session
# Instantiate the database connector with an empty string
# so that DuckDB doesn't create its own database file.
# We want to use DuckDB as a transformation engine and don't need
# to create tables, views, etc.
cursor = duckdb.connect("")
# Define the Parquet file to write to and delete it if it already exists
path_file_parquet = Path(Path.cwd()).joinpath('time_series.parquet')
if path_file_parquet.exists(): path_file_parquet.unlink()
if path_file_parquet.exists(): raise Exception("Parquet file exists after attempt to unlink(): ", path_file_parquet.exists())
# Define a query.
# The outer expression is a simple COPY … TO … , which writes the inner query’s result to a file.
query = "COPY (SELECT * FROM "
query += "read_csv('" + str(path_file_csv) + "', filename=True, delim = '|', header=True, AUTO_DETECT=TRUE)"
query += ")"
query += " TO '" + str(path_file_parquet) + "' (FORMAT 'parquet');"
print("\n")
print(query)
print("\n")
# COPY (SELECT * FROM read_csv('C:\\Users\\Mark Kiehl\\Documents\\computer\\Python\\venv\\savvy\\time_series.csv', filename=True, delim = '|', header=True, AUTO_DETECT=TRUE)) TO 'C:\\Users\\Mark Kiehl\\Documents\\computer\\Python\\venv\savvy\\time_series.parquet' (FORMAT 'parquet');
cursor.execute(query)
cursor.close()
# Read the Parquet file into memmory
# pip install pyarrow
from pyarrow import parquet
#table = parquet.read_table(path_file_parquet) # To a PyArrow table
#table = parquet.read_table(path_file_parquet).to_pandas() # To a Pandas table
table = parquet.read_table(path_file_parquet).to_pydict() # To a Python dictionary
print("Parquet file " + str(path_file_parquet) + ":")
print(table)
"""
# Use DuckDB as a transformation to read mULTIPLE CSV files and save them to a Parquet file.
"""
# Although the example CSV files are small, they could total many GB.
# DuckDB also has a much better performance than SQLite (5 to 10x).
import csv
from pathlib import Path
# Write the CSV file
path_file_csv = Path(Path.cwd()).joinpath('time_series_1.csv')
rows = [
["2023-03-02 13:46:46",1677764806,32762,1.17549435e-38,False,"str,ABC"],
["2023-03-02 13:46:47",1677764807,-32761,-1.17549435e-38,True,""],
["2023-03-02 13:46:48",1677764808,-100,1.7287196056221583e+38,True,"str DEF"]
]
header = ["datetime","unix_s","int","float","bool1","text"]
with open(path_file_csv, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
# Write two more CSV files
path_file_csv = Path(Path.cwd()).joinpath('time_series_2.csv')
rows = [
["2023-03-02 13:46:49",1677764809,32769,1.17549435e-38,False,"str,ABC"],
["2023-03-02 13:46:50",1677764810,-32759,-1.17549435e-38,True,""],
["2023-03-02 13:46:51",1677764811,-200,1.7287196056221583e+38,True,"str DEF"]
]
with open(path_file_csv, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
path_file_csv = Path(Path.cwd()).joinpath('time_series_3.csv')
rows = [
["2023-03-02 13:46:52",1677764812,32758,1.17549435e-38,False,"str,ABC"],
["2023-03-02 13:46:53",1677764813,-32757,-1.17549435e-38,True,""],
["2023-03-02 13:46:54",1677764814,-300,1.7287196056221583e+38,True,"str DEF"]
]
with open(path_file_csv, "w", newline='') as f:
csv_writer = csv.writer(f, delimiter =';')
# write the header
csv_writer.writerow(header)
# write the rows
csv_writer.writerows(rows)
# Read the CSV file with duckdb
import duckdb # pip install duckdb
# Define the Parquet file to write to and delete it if it already exists
path_file_parquet = Path(Path.cwd()).joinpath('time_series_3x.parquet')
if path_file_parquet.exists(): path_file_parquet.unlink()
if path_file_parquet.exists(): raise Exception("Parquet file exists after attempt to unlink(): ", path_file_parquet.exists())
# create a DuckDB session
# Instantiate the database connector with an empty string
# so that DuckDB doesn't create its own database file.
# We want to use DuckDB as a transformation engine and don't need
# to create tables, views, etc.
cursor = duckdb.connect("")
# Define a query.
# The outer expression is a simple COPY … TO … , which writes the inner query’s result to a file.
query = "COPY (SELECT * FROM "
query += "read_csv('time_series_*.csv', filename=True, delim = '|', header=True, AUTO_DETECT=TRUE)"
query += ")"
query += " TO 'time_series_3x.parquet' (FORMAT 'parquet');"
cursor.execute(query)
cursor.close()
# Read the Parquet file into memmory
# pip install pyarrow
from pyarrow import parquet
#table = parquet.read_table(path_file_parquet) # To a PyArrow table
#table = parquet.read_table(path_file_parquet).to_pandas() # To a Pandas table
table = parquet.read_table(path_file_parquet).to_pydict() # To a Python dictionary
print("Parquet file " + str(path_file_parquet) + ":")
print(table)
"""
escaped code goes here
https://www.freeformatter.com/html-escape.html
Python Solutions
Sitemap | Copyright © 2017 - 2025 Mechatronic Solutions LLC
Web site by www.MechatronicSolutionsLLC.com | | 143.6290 ms