pmfp.utils.url_utils 源代码

"""与url字符串相关的工具代码."""
import json
from typing import Optional, Callable
from urllib.parse import urlparse
import requests as rq
from requests.auth import HTTPBasicAuth, HTTPDigestAuth
from requests_oauthlib import OAuth1


[文档]def is_url(url: str) -> bool: """判断url是否是url. Args: url (str): 待判断的url字符串 Returns: bool: 是否是url """ try: result = urlparse(url) return all([result.scheme]) except ValueError: return False
[文档]def is_http_url(url: str) -> bool: """判断url是否是http请求的url. Args: url (str): 待判断的url字符串 Returns: bool: 是否是url """ try: result = urlparse(url) return all([result.scheme, result.netloc]) and result.scheme in ("http", "https") except ValueError: return False
[文档]def is_file_url(url: str) -> bool: """判断url是否是文件协议相关的url. Args: url (str): 待判断的url字符串 Returns: bool: 是否是url """ try: result = urlparse(url) return all([result.scheme]) and result.scheme == "file" except ValueError: return False
[文档]def query_http(url: str, method: str, *, auth: Optional[str] = None, auth_type: Optional[str] = None, payload: Optional[str] = None, payload_type: Optional[str] = None, stream: bool = False, verify: bool = False, cert: Optional[str] = None, cb: Optional[Callable[[str], None]] = None) -> None: """http请求并打印结果. Args: url (str): 要访问的http资源的地址 method (str): 访问资源的方法 auth (Optional[str], optional): 用户身份验证字符串. Defaults to None. auth_type (Optional[str], optional): 用户身份的验证类型. Defaults to None. payload (Optional[str], optional): 请求负载. Defaults to None. payload_type (Optional[str], optional): 请求的负载类型. Defaults to None. stream (bool, optional): 返回是否为流数据. Defaults to False. verify (bool, optional): https请求是否验证. Defaults to False. cert (Optional[str], optional): https请求的客户端认证文件. Defaults to None. cb (Optional[Callable[[str],None]], optional): 获取到数据后的处理回调. Defaults to None. """ with rq.Session() as s: if verify: s.verify = verify if auth_type and auth: if auth_type == "basic": user, pwd = auth.split(",") s.auth = HTTPBasicAuth(user, pwd) if auth_type == "digest": user, pwd = auth.split(",") s.auth = HTTPDigestAuth(user, pwd) elif auth_type == "jwt": s.headers = rq.structures.CaseInsensitiveDict({"Authorization": "Bearer " + auth}) elif auth_type == "oauth1": app_key, app_secret, oauth_token, oauth_token_secret = auth.split(",") s.auth = OAuth1(app_key, app_secret, oauth_token, oauth_token_secret) else: raise AttributeError(f"auth_type 参数 {auth_type} 目前不支持") if cert: cert_list = cert.split(",") cert_list_len = len(cert_list) if cert_list_len == 1: s.cert = cert_list[0] elif cert_list_len == 2: s.cert = (cert_list[0], cert_list[1]) else: raise AttributeError(f"cert 参数 {cert} 不合法") if payload is None: if stream is True: with s.request(method.upper(), url, stream=True) as res: for line in res.iter_lines(decode_unicode=True): if line: if cb: cb(line) else: res = s.request(method.upper(), url) if cb: cb(res.text) else: if payload_type == "stream": if stream is True: with open(payload, "rb") as f: with s.request(method.upper(), url, data=f, stream=True) as res: for line in res.iter_lines(decode_unicode=True): if line: if cb: cb(line) else: with open(payload, "rb") as f: res = s.request(method.upper(), url, data=f) if cb: cb(res.text) else: with open(payload, "r", encoding='utf-8') as fu: payload_dict = json.load(fu) if stream is True: if payload_type == "json": with s.request(method.upper(), url, json=payload_dict, stream=True) as res: for line in res.iter_lines(decode_unicode=True): if line: if cb: cb(line) elif payload_type == "form": with s.request(method.upper(), url, data=payload_dict, stream=True) as res: for line in res.iter_lines(decode_unicode=True): if line: if cb: cb(line) elif payload_type == "url": with s.request(method.upper(), url, params=payload_dict, stream=True) as res: for line in res.iter_lines(decode_unicode=True): if line: if cb: cb(line) else: raise AttributeError(f"不支持的负载类型{payload_type}") else: if payload_type == "json": res = s.request(method.upper(), url, json=payload_dict) if cb: cb(res.text) elif payload_type == "form": res = s.request(method.upper(), url, data=payload_dict) if cb: cb(res.text) elif payload_type == "url": res = s.request(method.upper(), url, params=payload_dict) if cb: cb(res.text) else: raise AttributeError(f"不支持的负载类型{payload_type}")
[文档]def parse_file_url(url: str) -> str: """从file url中提取文件系统中的路径. Args: url (str): file url Returns: str: file url中提取出的路径 """ path_str = urlparse(url).path if ":" in path_str: path = path_str[1:] else: path = path_str return path
[文档]def parse_http_url(url: str) -> str: """从file url中提取文件系统中的路径. Args: url (str): file url Returns: str: file url中提取出的路径 """ path = urlparse(url).path return path
[文档]def get_source_from_url(url: str) -> str: """从指定url中回去源数据. 注意只能获取静态http资源. Args: url (str): url地址 Raises: AttributeError: url未成功返回 AttributeError: 未支持的类型 Returns: str: 内容文本 """ if is_http_url(url): rs = rq.get(url) if rs.status_code != 200: raise AttributeError(f"url {url} 未成功返回") else: return rs.text elif is_file_url(url): path = parse_file_url(url) with open(path, "r", encoding='utf-8') as f: content = f.read() return content else: raise AttributeError(f"url {url} 未支持的类型")