NamedTuple Data Structure | Boost Recursive Functions With Cache | Python Concurrency (ThreadPoolExecutor) |
from typing import NamedTuple
class NewsData(NamedTuple):
# Define data structure to hold news data consisting of:
# author, content, title, date, image_url, article_url, description
from datetime import datetime
author: str
content: str
title: str
date: datetime
image_url: str
article_url: str
description: str
import datetime
# Create data to send using the NewsData NamedTuple
news_data = NewsData(author='Evan Ramstad',
content="America's political parties for decades have been shaped by the tension between the two poles of capitalism Democrats with labor and Republicans with capital.\r\nThis year, some Republicans say that, b… [+4792 chars]",
title='Ramstad: Labor unions flex new muscle in politics, as they have recently in boardrooms',
date=datetime.datetime.strptime('2024-07-27T13:03:00Z', "%Y-%m-%dT%H:%M:%SZ"),
image_url='https://arc.stimg.co/startribunemedia/2QOPOYRZKVAZ3HEMEDLL72BXGY.jpg?h=630&w=1200&fit=crop&bg=999&crop=faces',
article_url='https://www.startribune.com/ramstad-labor-unions-flex-new-muscle-in-politics-as-they-have-recently-in-boardrooms/600386691/',
description="The authenticity of the GOP's outreach to unions is yet to be proven. But it's a sign of the growing power of the labor movement."
)
# Fetch data from the NewsData NamedTuple
print("news_data.title:" + news_data.title)
print("news_data.date: " + news_data.date.strftime("%Y-%m-%d"))
from functools import cacheThe None Data Type
@cache
def my_recursive_fn(n):
n += 1
my_recursive_fn(n)
Instead of a traditional loop, you can use the ThreadPoolExecutor class, which is a high-level interface for running functions asynchronously using threads. If your machine has 8 cores, this class will run the function on 12 threads by default (number of cores + 4).
Include the code block below before every code snippit that follows.
def get_post(post_id: int) -> dict:
import time
import random
import requests
# pip install requests_ratelimiter
from requests_ratelimiter import LimiterSession
# Limit number of requests to max 2 calls per second
request_session = LimiterSession(per_second=2)
# Value check - Posts on the API only go up to ID of 100
if post_id > 100:
raise ValueError("Parameter `post_id` must be less than or equal to 100")
# Time the execution of this function
fn_start = time.perf_counter()
# API URL
url = f"https://jsonplaceholder.typicode.com/posts/{post_id}"
# Sleep to imitate a long-running process
#time.sleep(random.randint(1, 10))
# Fetch the data and return it
r = requests.get(url)
# Optionally limit the request rate using requests_ratelimiter
r = request_session.get(url)
r.raise_for_status()
result = r.json()
# To indicate how much time fetching took
result["fetch_time"] = round(time.perf_counter()-fn_start,1)
# Remove the longest key-value pair for formatting reasons
del result["body"]
return result
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
# Python Get Number of Cores CPU Using os.cpu_count() & calculate the threads used by ThreadPoolExecutor class
print("\nNumber of CPU cores: " + str(os.cpu_count()))
default_treads = os.cpu_count()+4
print("Default # threads ThreadPoolExecutor will use: " + str(default_treads) + "\n")
# Number of CPU cores: 16
# Default # threads ThreadPoolExecutor will use: 20
# NOTE: The values above will vary based on your PC capabilities
# Task without Python concurrency
# Measure the time
t_start = time.perf_counter()
print("Fetching data without ThreadPoolExecutor...\n")
# Simple iteration
# NOTE: range() purposely exceeds the value of default_treads
for post_id in range(1, default_treads*2):
post = get_post(post_id)
print(post)
# Task with Python concurrency (ThreadPoolExecutor())
t_start = time.perf_counter()
print("Fetching data with ThreadPoolExecutor...\n")
# Run post fetching concurrently using .submit() method (preferred).
# Unlike the map() method, the submit() method does not block while the task is executing.
# The .submit() method returns immediately with a Future object that provides a handle on the task.
# The Future object allows the running asynchronous task to be queried, canceled,
# and for the results to be retrieved later once the task is done.
# Note that you can also configure a callback handler to process the results as they become available (not demonstrated).
with ThreadPoolExecutor() as tpe:
# Issue tasks to the thread pool
futures = []
for post_id in range(1, default_treads*2):
future = tpe.submit(get_post, post_id)
futures.append(future)
# Process the results from the task..
for future in futures:
result = future.result()
print(result)
print(f"\nAll posts fetched! Took: {str(round(time.perf_counter()-t_start,1))} seconds.\n")
# This block of code is the same as before, but the .submit() block is consolidated.
with ThreadPoolExecutor() as tpe:
# Submit tasks and get future objects
futures = [tpe.submit(get_post, post_id) for post_id in range(1, default_treads*2)]
# Process task results
for future in as_completed(futures):
# Get and display the result
result = future.result()
print(result)
print(f"\nAll posts fetched! Took: {str(round(time.perf_counter()-t_start,1))} seconds.\n")
# All posts fetched! Took: 1.8 seconds.
# Using ThreadPoolExecutor() map() method
with ThreadPoolExecutor() as tpe:
# Note: the 2nd argument to .map() an be any iterable. Ex. posts = list(range(1, default_treads*2))
for result in tpe.map(get_post, range(1, default_treads*2)):
print(result)
print(f"\nAll posts fetched! Took: {str(round(time.perf_counter()-t_start,1))} seconds.\n")
# All posts fetched! Took: 2.3 seconds.
Python Concurrency — A Brain-Friendly Guide for Data Professionals
Python ThreadPoolExecutor: 7-Day Crash Course
How to Add a Callbacks to the ThreadPoolExecutor in Python
Pydantic is Python Dataclasses with validation, serialization and data transformation functions. You use Pydantic to check your data is valid, transform data into the shapes you need, and then serialize the results so they can be moved on to other applications.
# pip install pydantic
from pydantic import BaseModel
A Practical Guide to using Pydantic by Marc Nealer
Python Solutions
Sitemap | Copyright © 2017 - 2024 Mechatronic Solutions LLC
Web site by www.MechatronicSolutionsLLC.com | | 13.2910 ms