“”"第一步定时执行 ok
第二步收取邮件 ok
收取指定邮箱的邮件:zibofandianzha@163.com,获取附件第一列是号码。
第三步数据导入
在134.35.10.10上布置服务端(因为需要使用生产系统数据库)
第四步数据生成
第五步邮件派发 ok
将表格作为附件给 zibofandianzha@163.com 反馈。
“”"
from email.parser import HeaderParser
import poplib,email,telnetlib
import datetime,time,sys,traceback
from email.parser import Parser
from email.header import decode_header
from email.header import Header
from email.utils import parseaddr
import openpyxl
import xlrd
import pandas as pd
import cx_Oracle #Oracle 数据库连接
import imaplib
import smtplib #发送邮件模块
from email.mime.text import MIMEText #定义邮件内容
from email.mime.multipart import MIMEMultipart #用于传送附件
import os#时间计算
def get_current_time(input_date =‘0’):
#如果时间传入为空
if input_date ==‘0’:
ct = time.time()-246060
local_time = time.localtime(ct)
data_head = time.strftime(“%Y%m%d%H%M%S”, local_time)
data_secs = abs(ct - round(ct)) * 1000
time_stamp = “%s%03d” % (data_head, data_secs)
else:
time_stamp = input_date +‘120000001’
return time_stamp#邮箱配置文件
def email_type():
wb = openpyxl.load_workbook(‘./xx数据/邮箱配置文件.xlsx’)
print(‘email_type’,‘邮箱配置文件’)
ws = wb.active
minr = ws.min_row
minc = ws.min_column
maxr = ws.max_row
maxc = ws.max_column
# 数据库内容
#print(minr, minc, maxr, maxc)
rngs = ws.iter_rows(min_row=minr, min_col=minc,
max_row=maxr, max_col=maxc)
row_cs = 0
for row in rngs:
if row_cs == 1:
value = [c.value for c in row]
user = value[0]
password = value[1]
eamil_server = value[2]
smtpserver = value[3]
receives = value[4].replace(" “,”“).replace(”[“,”“).replace(”]“,”“).replace(”'“,”“).split(”,")
from_id = value[5]
row_cs += 1
return user,password,eamil_server,smtpserver,receives,from_iddef mysql_execute(in_sql, leixing):
# 登录数据库
#conn = pymysql.connect(host=‘127.0.0.1’, port=3306, user=‘szc’, password=‘szcNSP850219’, database=‘szc_sql’,
# charset=‘utf8’)
dsn = "134.80.200.216/pdbzbjs1"
conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")
# 得到一个可以执行SQL语句的光标对象
cursor = conn.cursor()
# 数据库执行导入的语句
if leixing == '数量':
# 反馈数量
count = cursor.execute(in_sql)
elif leixing == '单条':
# 反馈单条
cursor.execute(in_sql)
count = cursor.fetchone()[0]
elif leixing == '多条':
# 反馈多条
cursor.execute(in_sql)
count = cursor.fetchall()
elif leixing == '编辑':
count = cursor.execute(in_sql)
conn.commit()
# 关闭光标对象
cursor.close()
# 关闭数据库连接
conn.close()
# 反馈
return count
#因为xx提供的数据是xls的,需要使用xlrd
def enclosure_handle():
wb = xlrd.open_workbook('./xx数据/xx数据.xls')
#读取第一个表格内容
ws = wb.sheet_by_index(0)
ccol_datas = ws.col_values(0)
ccol_cs = len(ccol_datas)
if ccol_cs == 0:ccol_cs=1
ccol_num = 0
#数据写入前先清空数据表,以免有其他号码掺杂其中
sql = "delete from shzc.lw_xdjl_zhsy_hm_drb "
mysql_execute(sql, '编辑')
#遍历数据内容导入数据库
for ccol_data in ccol_datas:
if ccol_data is not None or ccol_data != '':
xls_to_sql(ccol_data[0:11])
ccol_num += 1
if ccol_num in (1,round(ccol_cs*0.33,0),round(ccol_cs*0.66,0),round(ccol_cs*1,0)) :
print('xx总数据:', ccol_cs,' 已完成:',ccol_num,' 完成比例:',str(round(ccol_num/ccol_cs,2)*100)+'%')
continue
#最后一步将导入号码时间改为当前时间
sql = "update shzc.lw_xdjl_zhsy_hm_drb a set a.in_time = sysdate, a.servnumber = trim(a.servnumber) where a.in_time is null"
mysql_execute(sql, '编辑')
#执行完成后返回 成功
return 'ok'
#专门用来将号码导入数据库的函数
def xls_to_sql(servnumber):
sql = "insert into shzc.lw_xdjl_zhsy_hm_drb (servnumber,user_id) values(‘%s’,‘%s’) " % (servnumber, ‘liuwei’)
mysql_execute(sql, ‘编辑’)#号码数据写入数据库后,就可以运行计算过程,生成表格等待发送
def sql_handle():
try:
sql = "drop table SHZC.lw_xdjl_zhsy_hm_sja "
mysql_execute(sql, '编辑')
except:
pass
#根据 shzc.lw_xdjl_zhsy_hm_drb 表里号码生成数据
sql = "create table SHZC.lw_xdjl_zhsy_hm_sja as select t.user_id,t.in_time,t.SERVNUMBER,b.subsid,b.acctid,pp.prodname,B.STATUS,zt.dictname 状态,b.statusdate 状态时间,b.STOPKEY,b.createdate 入网时间,b.settleday 账期日,B.ENUM,B.IMSI,B.REGISTERORGID,B.OWNERORGID,row_number() over (partition by b.servnumber order by nvl(b.createdate,sysdate-9999) desc ) 排名 from shzc.lw_xdjl_zhsy_hm_drb t,(select * from zhyw.subscriber b where b.status not in ('US26','US28') ) B ,(select * from tbcs.dict_item@bcv where groupid='US') zt ,tbcs.product@bcv pp where zt.dictid(+)=b.status and pp.prodid(+)=b.prodid and t.servnumber=b.servnumber(+)"
mysql_execute(sql, '编辑')
try:
sql = "drop table SHZC.lw_xdjl_zhsy_hm_sjb"
mysql_execute(sql, '编辑')
except:
pass
sql = "create table SHZC.lw_xdjl_zhsy_hm_sjb as select * from SHZC.lw_xdjl_zhsy_hm_sja a where a.排名 = 1 "
mysql_execute(sql, '编辑')
#清空导入表
sql = "delete shzc.lw_xdjl_zhsy_hm_drb "
mysql_execute(sql, '编辑')
try:
sql = "drop table SHZC.lw_xdjl_zhsy_hm_sjc "
mysql_execute(sql, '编辑')
except:
pass
sql = "create table SHZC.lw_xdjl_zhsy_hm_sjc as select a.user_id, a.in_time, a.servnumber, a.subsid, a.prodname, a.状态, a.入网时间, a.ownerorgid, b.huadan_typename, b.roamcity_id, c.provincename, c.areaname, b.COUNTY_ID,b.lac_id, b.cell_id, b.bts_name, b.roamcity_name, b.geo_longitude, b.geo_latitude from SHZC.lw_xdjl_zhsy_hm_sjb a,zibo.lw_sasz_xdjl_zhsy_bd b,shzc.yqjk_fxmyd_sjzb c where a.servnumber = b.product_no(+) and b.roamcity_id = c.areaid(+)"
mysql_execute(sql, '编辑')
try:
sql = "drop table SHZC.lw_xdjl_zhsy_hm_sjd "
mysql_execute(sql, '编辑')
except:
pass
sql = "create table SHZC.lw_xdjl_zhsy_hm_sjd as select a.user_id,a.in_time,a.servnumber,a.subsid,a.prodname,a.状态,a.入网时间,a.ownerorgid,a.huadan_typename,a.provincename,nvl(a.roamcity_name, a.areaname) roamcity_name,a.COUNTY_ID,a.lac_id,a.cell_id,nvl(a.bts_name, b.name) bts_name,a.geo_longitude,a.geo_latitude from SHZC.lw_xdjl_zhsy_hm_sjc a,zhyw.rpt_county b where substr(a.ownerorgid,8,1)=b.county_id(+)"
mysql_execute(sql, '编辑')
#插入记录表
sql = "insert into SHZC.lw_xdjl_zhsy_hm_sjbd select A.USER_ID, A.IN_TIME,A.SERVNUMBER,A.SUBSID,A.PRODNAME,A.状态,A.入网时间,A.OWNERORGID,A.HUADAN_TYPENAME,A.PROVINCENAME,A.ROAMCITY_NAME,a.COUNTY_ID,A.LAC_ID,A.CELL_ID,A.BTS_NAME,A.GEO_LONGITUDE,A.GEO_LATITUDE from SHZC.lw_xdjl_zhsy_hm_sjd a"
mysql_execute(sql, '编辑')
#结果数据生成表格准备发送
sql = sql = "select A.SERVNUMBER 号码,A.状态,nvl(case when a.roamcity_name is null then '淄博' end,a.roamcity_name)||'_'||nvl(case when nvl(a.状态,'0')<>'正使用' then nvl(a.county_id,a.bts_name) when nvl(a.roamcity_name,'0')='淄博' then nvl(a.county_id, a.bts_name) end,'不详') 使用区县 from SHZC.lw_xdjl_zhsy_hm_sjd a"
#上面语句部分代码MySQL不支持,等正式使用在测试
#sql = "select A.SERVNUMBER 号码,A.状态,a.roamcity_name 使用区县 from lw_xdjl_zhsy_hm_sjd a"
#conn_new = pymysql.connect(host='127.0.0.1', port=3306, user='szc', passwd='szcNSP850219', db='szc_sql',charset='utf8')
dsn = "134.80.200.216/pdbzbjs1"
conn = cx_Oracle.connect(user="zbweb", password="zibo_533_03", dsn=dsn, encoding="UTF-8")
df = pd.read_sql("""%s""" % sql, con=conn)
df.to_excel("./xx数据/xx结果下载.xlsx", index=False)
return '数据下载完成'
#邮件下载
class down_email():
def __init__(self,user,password,eamil_server,input_date,from_id):
# 输入邮件地址, 口令和POP3服务器地址:
self.user = user
# 此处密码是授权码,用于登录第三方邮件客户端
self.password = password
self.pop3_server = eamil_server
self.input_date = input_date
self.from_id = from_id
print('邮箱用户:',self.user,'邮箱密码:',self.password,'邮箱服务:', self.pop3_server)
def parseBody(self,message):
# 解析邮件内容
# 循环信件中的每一个mime的数据块
for part in message.walk():
# 这里要判断是否是multipart,是的话,里面的数据是一个message 列表
if not part.is_multipart():
charset = part.get_charset()
contenttype = part.get_content_type()
name = part.get_param("name") # 如果是附件,这里就会取出附件的文件名
#print('如果是附件,这里就会取出附件的文件名',name)
if name: # 在这里额外添加中国移动的条件,judgedate(name)==1
fh = email.header.Header(name)
fdh = email.header.decode_header(fh)
fname = fdh[0][0]
attach_data = part.get_payload(decode=True) # 解码出附件数据,然后存储到文件中
try:
f = open('./xx数据/xx数据.xls', 'wb') # 注意一定要用wb来打开文件,因为附件一般都是二进制文件
except:
f = open('./xx数据/xx数据.xls', 'wb')
f.write(attach_data)
f.close()
else:
fname = 'a.xls'
return str(fname).split("'")[1]
def run_ing(self):
str_day = str(datetime.date.today()) # 日期赋值
print('日期赋值',str_day)
if self.pop3_server[0:3] == 'ima' or self.pop3_server[0:3] == 'IMA':
try:
telnetlib.Telnet(self.pop3_server, 993)
self.server = imaplib.IMAP4_SSL(self.pop3_server, 993)
except:
time.sleep(50)
self.server = imaplib.IMAP4(self.pop3_server, 110)
self.server.login(self.user, self.password)
self.server.select()
# 搜索邮件内容 shazhicheng@sd.chinamobile.com
typ, data = self.server.search(None, 'ALL')
# 搜索邮件内容
data_a =str(data[0]).replace('b','').replace("'","").replace(' ',',').split(",")
count = len(data_a)
data_b = 9999
pcount = 0
subject_cs = 0
for num in data_a[::-1]:
#print('num',num)
if num == '' or num is None:
continue
typ, data_id = self.server.fetch(num, '(RFC822)')
text = data_id[0][1]
message = email.message_from_bytes(text) # 转换为email.message对象
yj_from = message.get('From')
yj_from_name = yj_from[yj_from.index('<')+1:-1]
print('发件人邮箱地址:',yj_from_name,'目标邮箱地址:',self.from_id)
try:
#防止邮件时间报错
yj_date = str(message.get('Date')[0:28])
except:
yj_date =get_current_time(self.input_date)[0:28]
# 检查 发送人邮箱地址,符合的反馈,self.from_id 提取自配置文件的目标邮箱
if yj_from_name == self.from_id:
# 检测是否已经发送过邮件,没有发送过的话才发送
sql = sql = "SELECT count(Email_title) FROM zbweb.customer_service_title where Email_title='%s' and in_time='%s'" % ( 'ga_sasz_mail_lcxh', yj_date)
num = mysql_execute(sql, '单条')
print('数据库中该邮件是否已发送过', num)
#没处理过的提取附件,附件提取成功的就修改 data_b 使程序进行
if num == 0:
# 没处理过的话,先插入表中,如果没有附件也不处理了,以免邮箱检索卡住
# 将标题与时间计入数据库,以防下次再处理
sql = "insert into zbweb.customer_service_title (Email_title,IN_TIME) values('%s','%s') " % (
'ga_sasz_mail_lcxh', yj_date)
num = mysql_execute(sql, '编辑')
# subject_cs 赋值
subject_cs += 1
f = self.parseBody(message) # 返回值是为了把文件名返回主函数
if f.strip() != '':
data_b = subject_cs
break
# 如果检索邮件数大于总邮件数就跳出,或者大于50就跳出,以免异常
pcount += 1
#print('pcount', pcount, 'count', count)
if pcount > count or pcount >= 10:
break
else:
continue
self.server.close()
self.server.logout()
#如果 data_b 没有经过重新赋值,说明没有附件,就给他0,让下面附件处理不运行
if data_b == 9999:
data_b = 0
return data_b # 返回值是为了把文件名返回主函数
#邮件推送
def email_out(user,password,smtpserver,sender,receives,title,content):
‘’’
:param user: 发送邮箱用户名
:param password:发送邮箱密码
:param smtpserver:发送服务器
:param sender:发送方邮箱
:param receives:接收方邮箱
:param title:邮件标题
:param content:邮件正文
:return:
‘’’
#邮件附件最后带日期,如果附件没有带点,就原名发出
# 发送邮件主题和内容
subject = title
#文字描述中 \n 替换成 <br/>,头尾
content_head = '<html><strong>尊敬的领导,您好:</strong><br/><br/> '
content_tail = '<br/><h4> 此致<br/>敬礼<br/></h4> <div align=right> <strong> 山东移动淄博分公司 </strong> </div>'
#content.replace('\n','<br/>').replace('^', ' ') 正文描述中的\n替换成换行,^还原成空格
content_new = content_head + content.replace('\n','<br/>').replace('^', ' ') + content_tail
send_file_path = r".\xx数据\xx结果下载.xlsx"
send_file_path = open(send_file_path, 'rb').read() # 'rb'表示r读取,b表示二进制方式读取
att = MIMEText(send_file_path, 'base64', 'utf-8') # 调用传送附件模块,传送附件
att["Content-Type"] = 'application/octet-stream'
# file_name 主题名加原来文件名后缀
file_name = 'xx查询结果反馈.xlsx'
filename = Header(file_name,'utf-8').encode()
att["Content-Disposition"] = 'attachment;filename=%s' % Header(file_name,'utf-8').encode() # 附件描述外层要用单引号
# 构建发送与接收信息
msgRoot = MIMEMultipart() # 发送附件的方法定义为一个变量
msgRoot.attach(MIMEText(content_new, 'html', 'utf-8')) # 发送附件的方法中嵌套发送正文的方法
msgRoot['subject'] = subject
#接收人显示内容
msgRoot['From'] = sender
msgRoot['Bcc'] = ','.join(receives)
msgRoot.attach(att) # 添加附件到正文中
# SSL协议端口号要使用465
smtp = smtplib.SMTP_SSL(smtpserver, 465)
# HELO 向服务器标识用户身份
smtp.helo(smtpserver)
# 服务器返回结果确认
smtp.ehlo(smtpserver)
# 登录邮箱服务器用户名和密码
smtp.login(user, password)
smtp.sendmail(sender, receives, msgRoot.as_string())
smtp.quit()
return "发送成功"
if name == ‘main’:
input_date = '0'
# 循环执行
while True:
in_time = get_current_time(input_date)
in_day = in_time[0:8]
# 输入邮件地址, 口令和POP3服务器地址:
user, password, eamil_server, smtpserver, receives,from_id = email_type()
#获取邮箱类
email_class = down_email(user=user, password=password, eamil_server=eamil_server, input_date=input_date,from_id=from_id)
#调用 email_class.run_ing() 获取邮件附件
try:
# 接收邮件并判断是否有符合的结果
email_list = email_class.run_ing()
#print('正常 email_list:', email_list)
except:
email_list = 0
#print('异常 email_list:', email_list)
#临时邮件接收不执行,先调试处理部分。正式运行时 下面 email_list = 1 需屏蔽
#email_list = 1
#定义一个状态,看看附件内容是否正常处理了,没处理就不执行下面内容
handle_statr = 'not'
#如果获取一个正常附件
if email_list == 1 :
print('对附件内容进行解析处理')
try:
handle_statr = enclosure_handle()
except:
pass
#查看 状态
print('handle_statr',handle_statr)
#handle_statr == 'ok'
#执行结果正常,可以处理后续数据,这里也单独弄一个函数运行
sql_statr = '数据下载失败'
if handle_statr =='ok':
try:
sql_statr = sql_handle()
except:
sql_statr = '数据下载失败'
# 查看 状态
print('sql_statr', sql_statr)
if sql_statr =='数据下载完成':
title ='淄博移动防电诈号码数据查询'
content = '淄博移动防电诈号码数据查询,请您过目'
try:
email_out_statr = email_out(user,password,smtpserver,user,receives,title,content)
print(in_time,'email_out_statr',email_out_statr)
except:
print(in_time,'email_out_statr','发送失败')
# 执行后延时60秒
time.sleep(60)