2025-02-14 17:42:36 +08:00
{% set pkField = pkColumn.python_field %}
{% set pk_field = pkColumn.python_field | camel_to_snake %}
2025-02-16 23:29:15 +08:00
{% set pkParentheseIndex = pkColumn.column_comment.find("( ") %}
{% set pk_field_comment = pkColumn.column_comment[:pkParentheseIndex] if pkParentheseIndex != -1 else pkColumn.column_comment %}
2025-02-17 17:43:29 +08:00
{% if dicts %}
from fastapi import Request
{% endif %}
2025-02-14 17:42:36 +08:00
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from config.constant import CommonConstant
from exceptions.exception import ServiceException
from module_admin.entity.vo.common_vo import CrudResponseModel
2025-02-17 17:43:29 +08:00
{% if dicts %}
from module_admin.service.dict_service import DictDataService
{% endif %}
from {{ packageName }}.dao.{{ businessName }}_dao import {{ BusinessName }}Dao
2025-02-14 17:42:36 +08:00
from {{ packageName }}.entity.vo.{{ businessName }}_vo import Delete{{ BusinessName }}Model, {{ BusinessName }}Model, {{ BusinessName }}PageQueryModel
from utils.common_util import CamelCaseUtil
from utils.excel_util import ExcelUtil
class {{ BusinessName }}Service:
"""
{{ functionName }}模块服务层
"""
@classmethod
async def get_{{ businessName }}_list_services(
cls, query_db: AsyncSession, query_object: {{ BusinessName }}PageQueryModel, is_page: bool = False
):
"""
获取{{ functionName }}列表信息service
:param query_db: orm对象
:param query_object: 查询参数对象
:param is_page: 是否开启分页
:return: {{ functionName }}列表信息对象
"""
{{ businessName }}_list_result = await {{ BusinessName }}Dao.get_{{ businessName }}_list(query_db, query_object, is_page)
return {{ businessName }}_list_result
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("( ") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
2025-02-18 16:10:57 +08:00
{% if column.unique %}
2025-02-14 17:42:36 +08:00
@classmethod
async def check_{{ column.python_field | camel_to_snake }}_unique_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
检查{{ comment }}是否唯一service
:param query_db: orm对象
:param page_object: {{ functionName }}对象
:return: 校验结果
"""
{{ pk_field }} = -1 if page_object.{{ pk_field }} is None else page_object.{{ pk_field }}
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_info(query_db, {{ BusinessName }}Model({{ column.python_field }}=page_object.{{ column.python_field | camel_to_snake }}))
if {{ businessName }} and {{ businessName }}.{{ pk_field }} != {{ pk_field }}:
return CommonConstant.NOT_UNIQUE
return CommonConstant.UNIQUE
{% if not loop.last %}{{ "\n" }}{% endif %}
{% endif %}
{% endfor %}
@classmethod
async def add_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
新增{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 新增{{ functionName }}对象
:return: 新增{{ functionName }}校验结果
"""
2025-02-17 17:43:29 +08:00
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("( ") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
2025-02-18 16:10:57 +08:00
{% if column.unique %}
2025-02-18 11:36:33 +08:00
if not await cls.check_{{ column.python_field | camel_to_snake }}_unique_services(query_db, page_object):
2025-02-17 17:43:29 +08:00
raise ServiceException(message=f'新增{{ functionName }}{page_object.{{ column.python_field | camel_to_snake }}}失败,{{ comment }}已存在')
{% endif %}
{% endfor %}
try:
{% if table.sub %}
add_{{ businessName }} = await {{ BusinessName }}Dao.add_{{ businessName }}_dao(query_db, page_object)
if add_{{ businessName }}:
2025-02-18 11:36:33 +08:00
for sub_table in page_object.{{ subclassName }}_list:
await {{ BusinessName }}Dao.add_{{ subTable.business_name }}_dao(query_db, sub_table)
2025-02-17 17:43:29 +08:00
{% else %}
await {{ BusinessName }}Dao.add_{{ businessName }}_dao(query_db, page_object)
{% endif %}
await query_db.commit()
return CrudResponseModel(is_success=True, message='新增成功')
except Exception as e:
await query_db.rollback()
raise e
2025-02-14 17:42:36 +08:00
@classmethod
async def edit_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: {{ BusinessName }}Model):
"""
编辑{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 编辑{{ functionName }}对象
:return: 编辑{{ functionName }}校验结果
"""
2025-02-18 16:10:57 +08:00
edit_{{ businessName }} = page_object.model_dump(exclude_unset=True, exclude={% raw %}{{% endraw %}{% if table.sub %}'{{ subclassName }}_list, '{% endif %}{% for column in columns %}{% if not column.edit and not column.pk %}'{{ column.python_field | camel_to_snake }}'{% if not loop.last %}, {% endif %}{% endif %}{% endfor %}{% raw %}}{% endraw %})
2025-02-16 23:29:15 +08:00
{{ businessName }}_info = await cls.{{ businessName }}_detail_services(query_db, page_object.{{ pk_field }})
if {{ businessName }}_info.{{ pk_field }}:
2025-02-17 17:43:29 +08:00
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("( ") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
2025-02-18 16:10:57 +08:00
{% if column.unique %}
2025-02-18 11:36:33 +08:00
if not await cls.check_{{ column.python_field | camel_to_snake }}_unique_services(query_db, page_object):
2025-02-17 17:43:29 +08:00
raise ServiceException(message=f'修改{{ functionName }}{page_object.{{ column.python_field | camel_to_snake }}}失败,{{ comment }}已存在')
{% endif %}
{% endfor %}
try:
await {{ BusinessName }}Dao.edit_{{ businessName }}_dao(query_db, edit_{{ businessName }})
{% if table.sub %}
for sub_table in page_object.{{ subclassName }}_list:
await {{ BusinessName }}Dao.edit_{{ subTable.business_name }}_dao(query_db, sub_table)
{% endif %}
await query_db.commit()
return CrudResponseModel(is_success=True, message='更新成功')
except Exception as e:
await query_db.rollback()
raise e
2025-02-14 17:42:36 +08:00
else:
raise ServiceException(message='{{ functionName }}不存在')
@classmethod
async def delete_{{ businessName }}_services(cls, query_db: AsyncSession, page_object: Delete{{ BusinessName }}Model):
"""
删除{{ functionName }}信息service
:param query_db: orm对象
:param page_object: 删除{{ functionName }}对象
:return: 删除{{ functionName }}校验结果
"""
if page_object.{{ pk_field }}s:
{{ pk_field }}_list = page_object.{{ pk_field }}s.split(',')
try:
for {{ pk_field }} in {{ pk_field }}_list:
await {{ BusinessName }}Dao.delete_{{ businessName }}_dao(query_db, {{ BusinessName }}Model({{ pkField }}={{ pk_field }}))
2025-02-17 17:43:29 +08:00
{% if table.sub %}
{{ businessName }} = await cls.{{ businessName }}_detail_services(query_db, int({{ pk_field }}))
for sub_table in {{ businessName }}.{{ subclassName }}_list:
await {{ BusinessName }}Dao.delete_{{ subTable.business_name }}_dao(query_db, sub_table)
{% endif %}
2025-02-14 17:42:36 +08:00
await query_db.commit()
return CrudResponseModel(is_success=True, message='删除成功')
except Exception as e:
await query_db.rollback()
raise e
else:
raise ServiceException(message='传入{{ pk_field_comment }}为空')
@classmethod
async def {{ businessName }}_detail_services(cls, query_db: AsyncSession, {{ pk_field }}: int):
"""
获取{{ functionName }}详细信息service
:param query_db: orm对象
2025-02-16 23:29:15 +08:00
:param {{ pk_field }}: {{ pk_field_comment }}
:return: {{ pk_field_comment }}对应的信息
2025-02-14 17:42:36 +08:00
"""
{{ businessName }} = await {{ BusinessName }}Dao.get_{{ businessName }}_detail_by_id(query_db, {{ pk_field }}={{ pk_field }})
if {{ businessName }}:
result = {{ BusinessName }}Model(**CamelCaseUtil.transform_result({{ businessName }}))
else:
result = {{ BusinessName }}Model(**dict())
return result
@staticmethod
2025-02-17 17:43:29 +08:00
async def export_{{ businessName }}_list_services({% if dicts %}request: Request, {% endif %}{{ businessName }}_list: List):
2025-02-14 17:42:36 +08:00
"""
导出{{ functionName }}信息service
:param {{ businessName }}_list: {{ functionName }}信息列表
:return: {{ functionName }}信息对应excel的二进制数据
"""
# 创建一个映射字典,将英文键映射到中文键
mapping_dict = {
{% for column in columns %}
{% set parentheseIndex = column.column_comment.find("( ") %}
{% set comment = column.column_comment[:parentheseIndex] if parentheseIndex != -1 else column.column_comment %}
'{{ column.python_field }}': '{{ comment }}',
{% endfor %}
}
2025-02-17 17:43:29 +08:00
{% if dicts %}
{% for dict_type in dicts.split(", ") %}
{{ dict_type[1:-1] }}_list = await DictDataService.query_dict_data_list_from_cache_services(
request.app.state.redis, dict_type={{ dict_type }}
)
{{ dict_type[1:-1] }}_option = [dict(label=item.get('dictLabel'), value=item.get('dictValue')) for item in {{ dict_type[1:-1] }}_list]
{{ dict_type[1:-1] }}_option_dict = {item.get('value'): item for item in {{ dict_type[1:-1] }}_option}
{% endfor %}
2025-02-14 17:42:36 +08:00
for item in {{ businessName }}_list:
2025-02-17 17:43:29 +08:00
{% for column in columns %}
{% if column.dict_type %}
if str(item.get('{{ column.python_field }}')) in {{ column.dict_type }}_option_dict.keys():
item['{{ column.python_field }}'] = {{ column.dict_type }}_option_dict.get(str(item.get('{{ column.python_field }}'))).get('label')
{% endif %}
{% endfor %}
{% endif %}
2025-02-14 17:42:36 +08:00
binary_data = ExcelUtil.export_list2excel({{ businessName }}_list, mapping_dict)
return binary_data