diff --git a/celery-client.py b/celery-client.py new file mode 100644 index 0000000..4890b6d --- /dev/null +++ b/celery-client.py @@ -0,0 +1,7 @@ +from celery_server.tasks import asyncRun + +# r = test.delay(1,2) +# r2 = test.delay(1,2) + +r = asyncRun.delay("13809213237") +print(r.get()) diff --git a/celery_server/README.MD b/celery_server/README.MD new file mode 100644 index 0000000..3ed2025 --- /dev/null +++ b/celery_server/README.MD @@ -0,0 +1,8 @@ +# Celery 异步服务器后端模块 + +## 部署 +```shell +pip install celery gevent -i https://pypi.doubanio.com/simple/ +celery -A celery_server worker -l info --pool=eventlet +``` +需要在 celery 5.0 中才能使用 async diff --git a/celery_server/__init__.py b/celery_server/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/celery_server/celery.py b/celery_server/celery.py new file mode 100644 index 0000000..9e02354 --- /dev/null +++ b/celery_server/celery.py @@ -0,0 +1,17 @@ +from celery import Celery +from celery.utils.log import get_task_logger + +app = Celery( + 'celery_server', + include=[ + 'celery_server.tasks' + ] +) +app.config_from_object( + 'celery_server.config', +) + +logger = get_task_logger(__name__) + +if __name__ == '__main__': + app.start() \ No newline at end of file diff --git a/celery_server/config.py b/celery_server/config.py new file mode 100644 index 0000000..286921c --- /dev/null +++ b/celery_server/config.py @@ -0,0 +1,11 @@ +#broker(消息中间件来接收和发送任务消息) +BROKER_URL = 'redis://localhost:6379/1' +#backend(存储worker执行的结果) +CELERY_RESULT_BACKEND = 'redis://localhost:6379/2' + +#设置时间参照,不设置默认使用的UTC时间 +CELERY_TIMEZONE = 'Asia/Shanghai' +#指定任务的序列化 +CELERY_TASK_SERIALIZER='json' +#指定执行结果的序列化 +CELERY_RESULT_SERIALIZER='json' \ No newline at end of file diff --git a/celery_server/utils.py b/celery_server/utils.py new file mode 100644 index 0000000..946f51c --- /dev/null +++ b/celery_server/utils.py @@ -0,0 +1,87 @@ +# encoding=utf8 +# 请求的方法 +from smsboom import load_getapi, load_json +from utils.log import logger +from utils.models import API +from utils import default_header +import httpx +from httpx import Limits +from typing import Union, List +import asyncio + +import sys +sys.path.append("E:\coding\SMSBoom") + + +def reqAPI(api: API, client: httpx.AsyncClient): + if isinstance(api.data, dict): + resp = client.request(method=api.method, json=api.data, + headers=api.header, url=api.url, timeout=10) + else: + resp = client.request(method=api.method, data=api.data, + headers=api.header, url=api.url, timeout=10) + return resp + + +async def asyncReqs(src: Union[API, str], phone: Union[tuple, str], semaphore): + """异步请求方法 + :param: + :return: + """ + # 多手机号支持 + if isinstance(phone, tuple): + phone_lst = [_ for _ in phone] + else: + phone_lst = [phone] + async with semaphore: + async with httpx.AsyncClient( + limits=Limits(max_connections=1000, + max_keepalive_connections=2000), + headers=default_header, + verify=False, + timeout=99999 + ) as c: + + for ph in phone_lst: + try: + if isinstance(src, API): + src = src.handle_API(ph) + r = await reqAPI(src, c) + else: + # 利用元组传参安全因为元组不可修改 + s = (src.replace(" ", "").replace("\n", "").replace("\t", "").replace( + "&", "").replace('\n', '').replace('\r', ''),) + r = await c.get(*s) + return r + except httpx.HTTPError as why: + # logger.error(f"异步请求失败{type(why)}") + pass + except TypeError: + # logger.error("类型错误") + pass + except Exception as wy: + # logger.exception(f"异步失败{wy}") + pass + +def callback(result): + """异步回调函数""" + log = result.result() + if log is not None: + # logger.info(f"请求结果:{log.text[:30]}") + print(log.text[:30]) + pass + + +async def runAsync(apis: List[Union[API, str]], phone: Union[tuple, str]): + + tasks = [] + + for api in apis: + semaphore = asyncio.Semaphore(999999) + task = asyncio.create_task(asyncReqs(api, phone, semaphore)) + task.add_done_callback(callback) + tasks.append(task) + + await asyncio.gather( + *tasks + )