Advanced Usage
This guide covers advanced features and techniques for using PyEuropePMC effectively.
Table of Contents
- Performance Optimization
- Custom Configurations
- Batch Processing
- Rate Limiting and Throttling
- Caching Strategies
- Advanced Query Techniques
- Integration with Other Tools
Performance Optimization
Concurrent Requests
import asyncio
from concurrent.futures import ThreadPoolExecutor
from pyeuropepmc import EuropePMC
def search_worker(query_batch):
"""Worker function for concurrent searches."""
client = EuropePMC()
results = []
for query in query_batch:
try:
batch_results = client.search(query, limit=50)
results.extend(batch_results)
except Exception as e:
print(f"Error with query '{query}': {e}")
return results
def concurrent_search(queries, max_workers=5):
"""Perform multiple searches concurrently."""
# Split queries into batches
batch_size = len(queries) // max_workers
query_batches = [
queries[i:i + batch_size]
for i in range(0, len(queries), batch_size)
]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {
executor.submit(search_worker, batch): batch
for batch in query_batches
}
all_results = []
for future in concurrent.futures.as_completed(future_to_batch):
batch_results = future.result()
all_results.extend(batch_results)
return all_results
# Example usage
queries = [
"machine learning",
"deep learning",
"artificial intelligence",
"neural networks",
"computer vision"
]
results = concurrent_search(queries, max_workers=3)
print(f"Found {len(results)} total articles")
Memory-Efficient Processing
def process_large_dataset(query, batch_size=100, max_results=10000):
"""Process large datasets without loading everything into memory."""
client = EuropePMC()
processed_count = 0
# Generator function for streaming results
def result_generator():
offset = 0
while offset < max_results:
try:
batch = client.search(
query,
limit=min(batch_size, max_results - offset),
offset=offset
)
if not batch:
break
for article in batch:
yield article
offset += len(batch)
except Exception as e:
print(f"Error at offset {offset}: {e}")
break
# Process results one at a time
for article in result_generator():
# Process each article individually
process_article(article)
processed_count += 1
if processed_count % 100 == 0:
print(f"Processed {processed_count} articles")
return processed_count
def process_article(article):
"""Process a single article."""
# Example: Extract and save key information
data = {
'title': article.title,
'year': article.pub_year,
'journal': article.journal,
'author_count': len(article.authors) if article.authors else 0
}
# Save to database, file, or perform analysis
save_to_database(data)
Custom Configurations
Configuration Class
from dataclasses import dataclass
from typing import Optional
@dataclass
class EuropePMCConfig:
"""Configuration for Europe PMC client."""
base_url: str = "https://www.ebi.ac.uk/europepmc/webservices/rest"
timeout: int = 30
retries: int = 3
rate_limit: float = 1.0
cache_enabled: bool = True
cache_ttl: int = 3600 # 1 hour
user_agent: str = "PyEuropePMC/1.0"
@classmethod
def from_file(cls, config_file: str) -> 'EuropePMCConfig':
"""Load configuration from file."""
import json
with open(config_file, 'r') as f:
config_data = json.load(f)
return cls(**config_data)
# Usage
config = EuropePMCConfig(
timeout=60,
retries=5,
rate_limit=0.5 # 2 requests per second
)
client = EuropePMC(config=config)
Environment-Based Configuration
import os
from pyeuropepmc import EuropePMC
class ConfiguredEuropePMC(EuropePMC):
"""Europe PMC client with environment-based configuration."""
def __init__(self):
super().__init__(
base_url=os.getenv('EUROPEPMC_BASE_URL', self.default_base_url),
timeout=int(os.getenv('EUROPEPMC_TIMEOUT', '30')),
retries=int(os.getenv('EUROPEPMC_RETRIES', '3')),
rate_limit=float(os.getenv('EUROPEPMC_RATE_LIMIT', '1.0'))
)
# Set up logging based on environment
log_level = os.getenv('EUROPEPMC_LOG_LEVEL', 'INFO')
self.setup_logging(log_level)
Batch Processing
Batch Search Operations
class BatchProcessor:
"""Efficient batch processing for Europe PMC operations."""
def __init__(self, client: EuropePMC, batch_size: int = 50):
self.client = client
self.batch_size = batch_size
self.results_cache = {}
def batch_search(self, queries: list, deduplicate: bool = True):
"""Process multiple queries in batches."""
all_results = []
for i in range(0, len(queries), self.batch_size):
batch = queries[i:i + self.batch_size]
batch_results = self._process_batch(batch)
all_results.extend(batch_results)
# Optional progress reporting
print(f"Processed {min(i + self.batch_size, len(queries))}/{len(queries)} queries")
if deduplicate:
all_results = self._deduplicate_results(all_results)
return all_results
def _process_batch(self, queries: list):
"""Process a single batch of queries."""
batch_results = []
for query in queries:
try:
if query in self.results_cache:
results = self.results_cache[query]
else:
results = self.client.search(query, limit=100)
self.results_cache[query] = results
batch_results.extend(results)
except Exception as e:
print(f"Error processing query '{query}': {e}")
continue
return batch_results
def _deduplicate_results(self, results):
"""Remove duplicate articles based on PMID."""
seen_pmids = set()
unique_results = []
for article in results:
if article.pmid and article.pmid not in seen_pmids:
seen_pmids.add(article.pmid)
unique_results.append(article)
elif not article.pmid:
# Keep articles without PMID
unique_results.append(article)
return unique_results
# Usage
processor = BatchProcessor(client)
queries = ["cancer", "diabetes", "COVID-19", "machine learning"]
results = processor.batch_search(queries)
Rate Limiting and Throttling
Advanced Rate Limiting
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Advanced rate limiter with burst handling."""
def __init__(self, max_requests: int, time_window: int):
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
self.lock = Lock()
def acquire(self):
"""Acquire permission to make a request."""
with self.lock:
now = time.time()
# Remove old requests outside the time window
while self.requests and self.requests[0] <= now - self.time_window:
self.requests.popleft()
# Check if we can make a request
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
# Calculate wait time
wait_time = self.requests[0] + self.time_window - now
return wait_time
class ThrottledEuropePMC(EuropePMC):
"""Europe PMC client with advanced throttling."""
def __init__(self, requests_per_minute: int = 60, **kwargs):
super().__init__(**kwargs)
self.rate_limiter = RateLimiter(requests_per_minute, 60)
def search(self, *args, **kwargs):
"""Search with rate limiting."""
permission = self.rate_limiter.acquire()
if permission is not True:
print(f"Rate limit reached, waiting {permission:.2f} seconds")
time.sleep(permission)
# Try again
self.rate_limiter.acquire()
return super().search(*args, **kwargs)
Caching Strategies
Redis-Based Caching
import redis
import json
import hashlib
from typing import Optional
class CachedEuropePMC(EuropePMC):
"""Europe PMC client with Redis caching."""
def __init__(self, redis_url: str = "redis://localhost:6379",
cache_ttl: int = 3600, **kwargs):
super().__init__(**kwargs)
self.redis_client = redis.from_url(redis_url)
self.cache_ttl = cache_ttl
def _get_cache_key(self, method: str, *args, **kwargs) -> str:
"""Generate cache key for method call."""
key_data = {
'method': method,
'args': args,
'kwargs': kwargs
}
key_string = json.dumps(key_data, sort_keys=True)
return hashlib.md5(key_string.encode()).hexdigest()
def search(self, *args, **kwargs):
"""Search with caching."""
cache_key = self._get_cache_key('search', *args, **kwargs)
# Try to get from cache
cached_result = self.redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Get fresh data
results = super().search(*args, **kwargs)
# Cache the results
self.redis_client.setex(
cache_key,
self.cache_ttl,
json.dumps([r.to_dict() for r in results])
)
return results
# Usage
cached_client = CachedEuropePMC(cache_ttl=7200) # 2 hours
results = cached_client.search("machine learning") # Will be cached
File-Based Caching
import os
import pickle
import hashlib
from pathlib import Path
class FileCachedEuropePMC(EuropePMC):
"""Europe PMC client with file-based caching."""
def __init__(self, cache_dir: str = "./cache", **kwargs):
super().__init__(**kwargs)
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_path(self, method: str, *args, **kwargs) -> Path:
"""Get cache file path for method call."""
key_data = f"{method}_{args}_{kwargs}"
key_hash = hashlib.md5(key_data.encode()).hexdigest()
return self.cache_dir / f"{key_hash}.pkl"
def search(self, *args, **kwargs):
"""Search with file caching."""
cache_path = self._get_cache_path('search', *args, **kwargs)
# Check if cache exists and is recent
if cache_path.exists():
# Check if cache is fresh (less than 1 hour old)
if time.time() - cache_path.stat().st_mtime < 3600:
with open(cache_path, 'rb') as f:
return pickle.load(f)
# Get fresh data
results = super().search(*args, **kwargs)
# Cache the results
with open(cache_path, 'wb') as f:
pickle.dump(results, f)
return results
Advanced Query Techniques
Query Builder
class QueryBuilder:
"""Build complex Europe PMC queries programmatically."""
def __init__(self):
self.terms = []
self.filters = []
def add_term(self, term: str, field: str = None, operator: str = "AND"):
"""Add a search term."""
if field:
formatted_term = f"{field}:\"{term}\""
else:
formatted_term = f"\"{term}\""
if self.terms:
self.terms.append(f" {operator} {formatted_term}")
else:
self.terms.append(formatted_term)
return self
def add_author(self, author: str, operator: str = "AND"):
"""Add author filter."""
return self.add_term(author, "AUTH", operator)
def add_journal(self, journal: str, operator: str = "AND"):
"""Add journal filter."""
return self.add_term(journal, "JOURNAL", operator)
def add_year_range(self, start_year: int, end_year: int):
"""Add publication year range."""
year_filter = f"PUB_YEAR:[{start_year} TO {end_year}]"
self.filters.append(year_filter)
return self
def add_mesh_term(self, mesh_term: str):
"""Add MeSH term filter."""
mesh_filter = f"MESH:\"{mesh_term}\""
self.filters.append(mesh_filter)
return self
def build(self) -> str:
"""Build the final query string."""
query_parts = []
if self.terms:
query_parts.append("(" + "".join(self.terms) + ")")
if self.filters:
query_parts.extend(self.filters)
return " AND ".join(query_parts)
# Usage
query = (QueryBuilder()
.add_term("machine learning")
.add_term("deep learning", operator="OR")
.add_author("Smith J")
.add_year_range(2020, 2023)
.add_mesh_term("Artificial Intelligence")
.build())
print(f"Generated query: {query}")
results = client.search(query)
Faceted Search
def faceted_search(base_query: str, facets: dict):
"""Perform faceted search with multiple filters."""
client = EuropePMC()
facet_results = {}
for facet_name, facet_values in facets.items():
facet_results[facet_name] = {}
for value in facet_values:
# Build query with facet filter
if facet_name == "year":
facet_query = f"{base_query} AND PUB_YEAR:{value}"
elif facet_name == "journal":
facet_query = f"{base_query} AND JOURNAL:\"{value}\""
elif facet_name == "author":
facet_query = f"{base_query} AND AUTH:\"{value}\""
else:
facet_query = f"{base_query} AND {facet_name}:\"{value}\""
try:
results = client.search(facet_query, limit=100)
facet_results[facet_name][value] = len(results)
except Exception as e:
print(f"Error with facet {facet_name}={value}: {e}")
facet_results[facet_name][value] = 0
return facet_results
# Usage
facets = {
"year": [2020, 2021, 2022, 2023],
"journal": ["Nature", "Science", "Cell"],
"author": ["Smith J", "Johnson A", "Williams R"]
}
results = faceted_search("CRISPR", facets)
print("Faceted search results:")
for facet_name, facet_data in results.items():
print(f"\n{facet_name.title()}:")
for value, count in facet_data.items():
print(f" {value}: {count} articles")
Integration with Other Tools
Pandas Integration
import pandas as pd
def results_to_dataframe(results):
"""Convert search results to pandas DataFrame."""
data = []
for article in results:
data.append({
'title': article.title,
'authors': '; '.join(article.authors) if article.authors else '',
'journal': article.journal,
'year': article.pub_year,
'pmid': article.pmid,
'doi': article.doi,
'abstract_length': len(article.abstract) if article.abstract else 0,
'author_count': len(article.authors) if article.authors else 0
})
return pd.DataFrame(data)
# Usage
results = client.search("bioinformatics", limit=100)
df = results_to_dataframe(results)
# Analyze data
print("Publication years:")
print(df['year'].value_counts().sort_index())
print("\nTop journals:")
print(df['journal'].value_counts().head(10))
print("\nAuthor statistics:")
print(df['author_count'].describe())
NetworkX Integration
import networkx as nx
def build_citation_network(articles):
"""Build citation network using NetworkX."""
G = nx.DiGraph()
for article in articles:
if not article.pmid:
continue
# Add article as node
G.add_node(article.pmid,
title=article.title,
journal=article.journal,
year=article.pub_year)
# Add citations as edges
try:
citations = client.fetch_citations(pmid=article.pmid, limit=50)
for citation in citations:
if citation.pmid:
G.add_edge(citation.pmid, article.pmid)
except Exception as e:
print(f"Error getting citations for {article.pmid}: {e}")
return G
# Usage
articles = client.search("graph theory", limit=20)
network = build_citation_network(articles)
# Analyze network
print(f"Network has {network.number_of_nodes()} nodes and {network.number_of_edges()} edges")
# Find most cited articles
in_degree = dict(network.in_degree())
most_cited = sorted(in_degree.items(), key=lambda x: x[1], reverse=True)[:5]
print("Most cited articles:")
for pmid, citation_count in most_cited:
title = network.nodes[pmid].get('title', 'Unknown')
print(f"{pmid}: {citation_count} citations - {title[:50]}...")
For more advanced techniques and integration examples, see the development documentation.