Standard I/O streams in Python
Working with input-output (I/O) streams in Python is a fundamental aspect of programming that includes processing standard input, output, and error streams, as well as working with files, network connections, and other data sources. Understanding these mechanisms is critical to building efficient and reliable applications.
Basic standard streams
In Python, standard I/O streams are represented by three main objects from the sys module:
- sys.stdin is a stream for reading data from standard input (usually a keyboard)
- sys.stdout is a stream for outputting data to standard output (usually a terminal screen)
- sys.stderr is a stream for outputting error messages to the standard error stream
Reading data from standard input
import sys
# Reading one line
data = sys.stdin.readline()
print("You have entered:", data.strip())
# Reading all data
all_data = sys.stdin.read()
print("All data:", all_data)
# Line-by-line reading
for line in sys.stdin:
print("Line:", line.strip())
Data output to standard streams
import sys
# Output to the standard
sys.stdout.write output stream("Hello world!\n")
sys.stdout.flush() # Force buffer sending
# Error message output
sys.stderr.write("This is an error message!\n")
sys.stderr.flush()
# Using print() with flow indication
print("Normal message", file=sys.stdout)
print("Error message", file=sys.stderr)
Working with files
The main file opening modes
Python provides the open() function for working with files in various modes:
- 'r' - read (default)
- 'w' - record (overwrites an existing file)
- 'a' - appending to the end of the file
- 'x' - exclusive creation (the file must not exist)
- 'b' - binary mode (for example, 'rb', 'wb')
- 't' - text mode (default)
Examples of working with files
# File reading
with open("example.txt", "r", encoding="utf-8") as file:
content = file.read()
print(content)
# Line-by-line reading
with open("example.txt", "r", encoding="utf-8") as file:
for line in file:
print(line.strip())
# Writing to a file
with open("output.txt", "w", encoding="utf-8") as file:
file.write("Hello, world!\n")
file.write("Second line\n")
# Adding to the file
with open("output.txt", "a", encoding="utf-8") as file:
file.write("Added string\n")
# Working with binary files
with open("image.jpg", "rb") as binary_file:
data = binary_file.read()
print(f"File size: {len(data)} bytes")
Error handling when working with files
import os
try:
with open("nonexistent.txt", "r") as file:
content = file.read()
except FileNotFoundError:
print("File not found")
except PermissionError:
print("No file access rights")
except IOError as e:
print(f"Input/output error: {e}")
# Checking the existence of the file
if os.path.exists("example.txt "):
with open("example.txt", "r") as file:
content = file.read()
Working with network streams
TCP server
import socket
import threading
def handle_client(conn, addr):
"""Client connection handler"""
print(f'The client is connected: {addr}')
try:
while True:
data = conn.recv(1024)
if not data:
break
print(f'Received from {addr}: {data.decode()}')
conn.sendall(data) # Echo server
except Exception as e:
print(f' Error during client processing {addr}: {e}')
finally:
conn.close()
print(f'Connection to {addr} is closed')
def start_server():
HOST = '127.0.0.1'
PORT = 12345
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((HOST, PORT))
server.listen(5)
print(f' Server running on {HOST}:{PORT}')
while True:
conn, addr = server.accept()
client_thread = threading.Thread(
target=handle_client,
args=(conn, addr)
)
client_thread.start()
if __name__ == "__main__":
start_server()
TCP client
import socket
def tcp_client():
HOST = '127.0.0.1'
PORT = 12345
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as client:
client.connect((HOST, PORT))
# Sending data
message = b'Hello, world!'
client.sendall(message)
# Getting a response
data = client.recv(1024)
print(f'Received: {data.decode()}')
except ConnectionRefusedError:
print("Couldn't connect to the server")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
tcp_client()
Buffering and performance
Buffering management
import sys
# Disabling buffering for stdout
sys.stdout = open(sys.stdout.fileno(), 'w', buffering=0)
# Working with buffering when writing files
with open("large_file.txt", "w", buffering=8192) as file:
for i in range(1000000):
file.write(f"String {i}\n")
if i % 1000 == 0:
file.flush() # Force buffer write
Context managers for streams
import contextlib
import sys
@contextlib.contextmanager
def redirect_stdout(new_target):
"""Context manager for redirecting stdout"""
old_target = sys.stdout
sys.stdout = new_target
try:
yield new_target
finally:
sys.stdout = old_target
# Usage
with open("output.txt", "w") as f:
with redirect_stdout(f):
print("This will be written to a file")
print("This too")
Asynchronous I/O
Using asyncio for asynchronous I/O
import asyncio
import aiofiles
async def read_file_async(filename):
"""Asynchronous file reading"""
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return content
async def write_file_async(filename, data):
"""Asynchronous writing to a file"""
async with aiofiles.open(filename, 'w') as file:
await file.write(data)
async def main():
# Parallel reading of multiple files
tasks = [
read_file_async("file1.txt"),
read_file_async("file2.txt"),
read_file_async("file3.txt")
]
results = await asyncio.gather(*tasks)
for i, content in enumerate(results):
print(f"File {i+1}: {len(content)} characters")
# Running asynchronous code
asyncio.run(main())
Best Practices
Recommendations for working with I/O streams
- Always use context managers (
withstatement) to automatically close resources - Specify the encoding when working with text files
- Handle exceptions to prevent program crashes
- Use buffering to improve performance when working with large amounts of data
- Use asynchronous I/O for high-latency operations
An example of complex use
import sys
import logging
from pathlib import Path
# Setting
up logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('app.log'),
logging.StreamHandler(sys.stdout)
]
)
def process_files(input_dir, output_dir):
"""File processing with logging"""
input_path = Path(input_dir)
output_path = Path(output_dir)
if not input_path.exists():
logging.error(f"The input directory does not exist: {input_dir}")
return
output_path.mkdir(exist_ok=True)
for file_path in input_path.glob("*.txt"):
try:
with open(file_path, 'r', encoding='utf-8') as infile:
content = infile.read()
processed_content = content.upper()
output_file = output_path / f"processed_{file_path.name}"
with open(output_file, 'w', encoding='utf-8') as outfile:
outfile.write(processed_content)
logging.info (f"Processed file: {file_path.name }")
except Exception as e:
logging.error(f"Error during processing {file_path.name }: {e}")
if __name__ == "__main__":
process_files("input", "output")
Working with I/O streams in Python provides powerful capabilities for creating efficient applications capable of processing data from various sources. Properly understanding and using these mechanisms is the key to creating reliable and productive code.