python初学者学习简单教程
发布时间
阅读量:
阅读量
一、编写理念
- 很多教程虽然看似系统,但初学者耗时长却难以理解所学用途.本文引导读者迅速掌握核心内容,并采用少而精的学习方式,从而实现真正掌握python编程的核心目标.
- 这种做法并非全面讲解python语言,而是重点突出核心内容,并结合实际案例进行操作演示.对于其他领域相关知识,则需通过持续实践来逐步深入掌握.
- 本文示例代码力求简短实用,帮助读者以最高效的方式掌握基础技能;而不做理论教学相关内容的展开,如有疑问可留言进一步交流.
读者想学什么可以留言给我,我会考虑编写对应章节。
二、开发环境
为了更好地帮助大家掌握相关知识而准备的内容

三、基础知识
3.1、入门
if __name__ == '__main__':
print("Hello World!")
3.2、变量及注释
if __name__ == '__main__':
# 这是一个注释
str = "Hello World!"
print(str)
3.3、条件控制
if __name__ == '__main__':
str = "AAA"
if str == "AAA":
print("Yes")
else:
print("No")
print(str)
3.4、循环控制(for)
if __name__ == '__main__':
for i in range(1, 10):
print(i)
3.5、循环控制(while)
if __name__ == '__main__':
i = 1
while i <= 10:
print(i)
i = i + 1
3.6、函数
def print_num(min, max):
i = min
while i <= max:
print(i)
i = i + 1
if __name__ == '__main__':
print_num(1, 10)
3.7、数据结构
3.7.1、列表
if __name__ == '__main__':
l = [10, 20, 30, 40, 50, 60, 70, 80, 90]
#截取列表一部分
print(l[0:4])
#遍历列表
for x in l:
print(x, end = ",")
print()
#追加1000
l.append(1000)
print(l)
#删除30
del l[2]
print(l)
#列表长度
print(len(l))
3.7.2、字典
dict = {'Name': 'Runoob', 'Age': 7, 'Class': 'First'}
print ("dict['Name']: ", dict['Name'])
print ("dict['Age']: ", dict['Age'])
dict['Age'] = 8 # 更新 Age
dict['School'] = "菜鸟教程" # 添加信息
print ("dict['Age']: ", dict['Age'])
print ("dict['School']: ", dict['School'])
3.8、类和对象
#类定义
class people:
#定义基本属性
name = ''
age = 0
#定义私有属性,私有属性在类外部无法直接进行访问
__weight = 0
#定义构造方法
def __init__(self, n, a, w):
self.name = n
self.age = a
self.__weight = w
def speak(self):
print("%s 说: 我 %d 岁。" %(self.name,self.age))
if __name__ == '__main__':
# 实例化类
p = people('runoob',10,30)
p.speak()
print("name = {0}, age = {1}".format(p.name, p.age))
3.9、模块
# 文件名: support.py
def print_func( par ):
print ("Hello : ", par)
return
# 文件名: test.py
# 导入模块
import support
# 现在可以调用模块里包含的函数了
support.print_func("Runoob")
3.10、错误
while True print('Hello world')
File "<stdin>", line 1, in ?
while True print('Hello world')
^
SyntaxError: invalid syntax
3.11、异常
while True:
try:
x = int(input("请输入一个数字: "))
break
except ValueError as err:
print("您输入的不是数字,请再次尝试输入:", err)
finally:
print("无论出现什么异常都会执行到这里!")
四、常用第三方库
4.1、安装第三方库
打开工具-管理包菜单,在其中添加要安装的第三方库名并进行查找。找到后选择并完成该第三方库的安装。

4.2、协程使用
请参考官方文档:https://www.apiref.com/python-zh/library/asyncio.html
4.2.1、入门
import asyncio
async def main():
print('Hello ...')
await asyncio.sleep(1)
print('... World!')
# Python 3.7+
asyncio.run(main())
4.2.2、多任务
import asyncio
import time
async def task(task_name, sleep_time):
while True:
print(task_name)
await asyncio.sleep(sleep_time)
if __name__ == '__main__':
dltasks = set()
dltasks.add(asyncio.ensure_future(task("task1", 5)))
dltasks.add(asyncio.ensure_future(task("task2", 8)))
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(dltasks))
4.2.3、调用阻塞函数
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
class aioTest:
def __init__(self):
self.__executor = ThreadPoolExecutor(3)
self.__loop = asyncio.get_event_loop()
def __print_time(self, threadName, delay):
print("print_time start")
count = 0
while count < 5:
print("count = %s" % count)
time.sleep(delay)
count += 1
sRet = threadName + ":" + time.ctime(time.time())
print("print_time end")
return sRet
def print_time(self, threadName, delay):
return self.__loop.run_in_executor(self.__executor, self.__print_time, threadName, delay)
async def main():
at = aioTest()
print('suspending the coroutine')
result = await at.print_time("Thread-1", 1)
return result
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
4.3、文件读写&Json读写
# -*- coding: utf-8 -*-
import asyncio
import aiofiles
import json
async def main():
txtFileName = "xx.txt"
json_data = {
'no' : 1,
'name' : 'Runoob',
'url' : 'http://www.runoob.com'
}
async with aiofiles.open(txtFileName, "w", encoding="UTF-8") as fp:
json_str = json.dumps(json_data, indent=4, ensure_ascii=False)
await fp.write(json_str)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# -*- coding: utf-8 -*-
import asyncio
import aiofiles
import json
async def main():
txtFileName = "xx.txt"
json_data = {}
async with aiofiles.open(txtFileName , 'r', encoding='utf-8') as fp:
json_str = await fp.read()
json_data = json.loads(json_str)
print(json_data)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
4.4、网络通讯
4.4.1、TCP/IP
4.4.1.1、服务端
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addr = server.sockets[0].getsockname()
print(f'Serving on {addr}')
async with server:
await server.serve_forever()
if __name__ == '__main__':
asyncio.run(main())
4.4.1.2、客户端
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
if __name__ == '__main__':
asyncio.run(tcp_echo_client('Hello World!'))
4.4.1.3、TCP端口映射
#!/usr/bin/python3
#-*- coding: utf-8 -*-
import asyncio
import hexdump
class tcpproxy:
def __init__(self, remote_ip, remote_port):
self.__remote_ip = remote_ip
self.__remote_port = remote_port
async def __copy_data_thread(self, info, reader, writer):
addr = writer.get_extra_info('peername')
while True:
chunk = await reader.read(1024)
if not chunk:
break
print("{0}:{1}".format(info, addr))
hexdump.hexdump(chunk)
writer.write(chunk)
await writer.drain()
print("close sock[{0}]:{1}".format(info, addr))
writer.close()
async def accept_handle(self, a_reader, a_writer):
c_reader, c_writer = await asyncio.open_connection(self.__remote_ip, self.__remote_port)
dltasks = set()
dltasks.add(asyncio.ensure_future(self.__copy_data_thread('c->a', c_reader, a_writer)))
dltasks.add(asyncio.ensure_future(self.__copy_data_thread('a->c', a_reader, c_writer)))
dones, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
def main(local_ip, local_port, remote_ip, remote_port):
tp = tcpproxy(remote_ip, remote_port)
loop = asyncio.get_event_loop()
coro = asyncio.start_server(tp.accept_handle, local_ip, local_port, loop=loop)
server = loop.run_until_complete(coro)
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()
if __name__ == "__main__":
#监听本地端口9000,数据转发到127.0.0.1:9001上。
main('0.0.0.0', 9000, '127.0.0.1', 9001)
4.4.2、HTTP
4.4.2.1、服务端
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle),
web.get('/{name}', handle)])
if __name__ == '__main__':
web.run_app(app, port=7777)
4.4.2.2、客户端
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://127.0.0.1:7777/python_t') as response:
print("Status:", response.status)
print("Content-type:", response.headers['content-type'])
html = await response.text()
print("Body:", html[:15], "...")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
4.4.3、WebSocket
4.4.3.1、服务端
import asyncio
import websockets
async def recv_msg(websocket):
while True:
recv_text = await websocket.recv()
response_text = f"echo: {recv_text}"
await websocket.send(response_text)
async def main_logic(websocket, path):
await recv_msg(websocket)
if __name__ == '__main__':
start_server = websockets.serve(main_logic, '0.0.0.0', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
4.4.3.2、客户端
import asyncio
import websockets
async def send_msg(websocket):
while True:
_text = input("please enter your context: ")
if _text == "exit":
print(f'you have enter "exit", goodbye')
await websocket.close(reason="user exit")
return False
await websocket.send(_text)
recv_text = await websocket.recv()
print(f"{recv_text}")
async def main_logic():
async with websockets.connect('ws://127.0.0.1:5678') as websocket:
await send_msg(websocket)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main_logic())
4.4.3.3、HTML+JS客户端
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Websocket</title>
</head>
<body>
<h1>Echo Test</h1>
<input type="text" id="sendTxt">
<button id="sendBtn">发送</button>
<div id="recv"></div>
<script>
var websocket = new WebSocket("ws://127.0.0.1:5678/");
websocket.onopen = function()
{
console.log('websocket open');
document.getElementById('recv').innerHTML = "Connected";
}
websocket.onclose = function()
{
console.log('websocket close');
}
websocket.onmessage = function(e)
{
console.log(e.data);
document.getElementById("recv").innerHTML = e.data;
}
document.getElementById("sendBtn").onclick = function()
{
let txt = document.getElementById("sendTxt").value;
websocket.send(txt);
}
</script>
</body>
</html>
4.5、物联网(MQTT)
MQTT测试服务器
Broker: broker.emqx.io
TCP 端口: 1883
Websocket 端口: 8083
TCP/TLS 端口: 8883
Websocket/TLS 端口: 8084
MQTT网页版测试客户端
https://www.emqx.cn/mqtt/mqtt-websocket-toolkit
安装第三方库:pip install paho-mqtt
4.5.1、订阅主题
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 600)
client.subscribe('emqtt',qos=0)
client.loop_start()
4.5.2、发布消息
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect("broker.emqx.io", 1883, 600)
client.publish('emqtt',payload='Hello,EMQ!',qos=0)
client.loop_start()
4.6、数据库(MySQL)
import mysql.connector
mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="123456",
database="runoob_db"
)
mycursor = mydb.cursor()
mycursor.execute("SELECT * FROM sites")
myresult = mycursor.fetchall() # fetchall() 获取所有记录
for x in myresult:
print(x)
全部评论 (0)
还没有任何评论哟~
