Advertisement

python初学者学习简单教程

阅读量:

一、编写理念

  1. 很多教程虽然看似系统,但初学者耗时长却难以理解所学用途.本文引导读者迅速掌握核心内容,并采用少而精的学习方式,从而实现真正掌握python编程的核心目标.
  2. 这种做法并非全面讲解python语言,而是重点突出核心内容,并结合实际案例进行操作演示.对于其他领域相关知识,则需通过持续实践来逐步深入掌握.
  3. 本文示例代码力求简短实用,帮助读者以最高效的方式掌握基础技能;而不做理论教学相关内容的展开,如有疑问可留言进一步交流.

读者想学什么可以留言给我,我会考虑编写对应章节。

二、开发环境

为了更好地帮助大家掌握相关知识而准备的内容

在这里插入图片描述

三、基础知识

3.1、入门

复制代码
    if __name__ == '__main__':
    print("Hello World!")

3.2、变量及注释

复制代码
    if __name__ == '__main__':
    	# 这是一个注释	
    str = "Hello World!"
    print(str)

3.3、条件控制

复制代码
    if __name__ == '__main__':
    str = "AAA"
    if str == "AAA":
        print("Yes")
    else:
        print("No")
    print(str)

3.4、循环控制(for)

复制代码
    if __name__ == '__main__':
    for i in range(1, 10):
        print(i)

3.5、循环控制(while)

复制代码
    if __name__ == '__main__':
    i = 1
    while i <= 10:
        print(i)
        i = i + 1

3.6、函数

复制代码
    def print_num(min, max):
    i = min
    while i <= max:
        print(i)
        i = i + 1    
    
    if __name__ == '__main__':
    print_num(1, 10)

3.7、数据结构

3.7.1、列表

复制代码
    if __name__ == '__main__':
    l = [10, 20, 30, 40, 50, 60, 70, 80, 90]
    #截取列表一部分
    print(l[0:4])
    
    #遍历列表
    for x in l:
        print(x, end = ",")
    print()
    
    #追加1000
    l.append(1000)
    print(l)
    
    #删除30
    del l[2]
    print(l)
    
    #列表长度
    print(len(l))

3.7.2、字典

复制代码
    dict = {'Name': 'Runoob', 'Age': 7, 'Class': 'First'}
     
    print ("dict['Name']: ", dict['Name'])
    print ("dict['Age']: ", dict['Age'])
    dict['Age'] = 8               # 更新 Age
    dict['School'] = "菜鸟教程"  # 添加信息
    print ("dict['Age']: ", dict['Age'])
    print ("dict['School']: ", dict['School'])

3.8、类和对象

复制代码
    #类定义
    class people:
    #定义基本属性
    name = ''
    age = 0
    #定义私有属性,私有属性在类外部无法直接进行访问
    __weight = 0
    #定义构造方法
    def __init__(self, n, a, w):
        self.name = n
        self.age = a
        self.__weight = w
    def speak(self):
        print("%s 说: 我 %d 岁。" %(self.name,self.age))
    if __name__ == '__main__': 
    	# 实例化类
    	p = people('runoob',10,30)
    	p.speak()
    	print("name = {0}, age = {1}".format(p.name, p.age))

3.9、模块

复制代码
    # 文件名: support.py
    def print_func( par ):
    print ("Hello : ", par)
    return
复制代码
    # 文件名: test.py
    # 导入模块
    import support
    # 现在可以调用模块里包含的函数了
    support.print_func("Runoob")

3.10、错误

复制代码
    while True print('Hello world')
    File "<stdin>", line 1, in ?
       while True print('Hello world')
                   ^
    SyntaxError: invalid syntax

3.11、异常

复制代码
    while True:
    try:
        x = int(input("请输入一个数字: "))
        break
    except ValueError as err:
        print("您输入的不是数字,请再次尝试输入:", err)
    finally: 
    	print("无论出现什么异常都会执行到这里!")

四、常用第三方库

4.1、安装第三方库

打开工具-管理包菜单,在其中添加要安装的第三方库名并进行查找。找到后选择并完成该第三方库的安装。

在这里插入图片描述

4.2、协程使用

请参考官方文档:https://www.apiref.com/python-zh/library/asyncio.html

4.2.1、入门

复制代码
    import asyncio
    
    async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')
    
    # Python 3.7+
    asyncio.run(main())

4.2.2、多任务

复制代码
    import asyncio
    import time
    
    async def task(task_name, sleep_time):
    while True:
        print(task_name)
        await asyncio.sleep(sleep_time)
    
    if __name__ == '__main__':
    dltasks = set()
    dltasks.add(asyncio.ensure_future(task("task1", 5)))
    dltasks.add(asyncio.ensure_future(task("task2", 8)))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(dltasks))

4.2.3、调用阻塞函数

复制代码
    import time
    import asyncio
    from concurrent.futures import ThreadPoolExecutor
    
    class aioTest:
    def __init__(self):
        self.__executor = ThreadPoolExecutor(3)
        self.__loop = asyncio.get_event_loop()
    
    def __print_time(self, threadName, delay):
        print("print_time start")
        count = 0
        while count < 5:
            print("count = %s" % count)
            time.sleep(delay)
            count += 1
    
        sRet = threadName + ":" + time.ctime(time.time())
        print("print_time end")
        return sRet
    
    def print_time(self, threadName, delay):
        return self.__loop.run_in_executor(self.__executor, self.__print_time, threadName, delay)
    
    async def main():
    at = aioTest()
    print('suspending the coroutine')
    result = await at.print_time("Thread-1", 1)
    return result
    
    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

4.3、文件读写&Json读写

复制代码
    # -*- coding: utf-8 -*-
    import asyncio
    import aiofiles
    import json
    
    async def main():
    txtFileName = "xx.txt"
    json_data = {
        'no' : 1,
        'name' : 'Runoob',
        'url' : 'http://www.runoob.com'
    }
    
    async with aiofiles.open(txtFileName, "w", encoding="UTF-8") as fp:
       json_str = json.dumps(json_data, indent=4, ensure_ascii=False)
       await fp.write(json_str)
    
    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
复制代码
    # -*- coding: utf-8 -*-
    import asyncio
    import aiofiles
    import json
    
    async def main():
    txtFileName = "xx.txt"
    json_data = {}
    async with aiofiles.open(txtFileName , 'r', encoding='utf-8') as fp:
        json_str = await fp.read()
        json_data = json.loads(json_str)
        print(json_data)
        
    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

4.4、网络通讯

4.4.1、TCP/IP

4.4.1.1、服务端
复制代码
    import asyncio
    
    async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    
    print(f"Received {message!r} from {addr!r}")
    
    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()
    
    print("Close the connection")
    writer.close()
    
    async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)
    
    addr = server.sockets[0].getsockname()
    print(f'Serving on {addr}')
    
    async with server:
        await server.serve_forever()
    if __name__ == '__main__':
    	asyncio.run(main())
4.4.1.2、客户端
复制代码
    import asyncio
    
    async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)
    
    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()
    
    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')
    
    print('Close the connection')
    writer.close()
    await writer.wait_closed()
    
    if __name__ == '__main__':
    	asyncio.run(tcp_echo_client('Hello World!'))
4.4.1.3、TCP端口映射
复制代码
    #!/usr/bin/python3
    #-*- coding: utf-8 -*-
    import asyncio
    import hexdump
    
    class tcpproxy:
    def __init__(self, remote_ip, remote_port):
        self.__remote_ip = remote_ip
        self.__remote_port = remote_port
        
    async def __copy_data_thread(self, info, reader, writer):
        addr = writer.get_extra_info('peername')
    
        while True:
            chunk = await reader.read(1024)
            if not chunk:
                break
            print("{0}:{1}".format(info, addr))
            hexdump.hexdump(chunk)
            writer.write(chunk)
            await writer.drain()
    
        print("close sock[{0}]:{1}".format(info, addr))
        writer.close()
    
    async def accept_handle(self, a_reader, a_writer):
        c_reader, c_writer = await asyncio.open_connection(self.__remote_ip, self.__remote_port)
    
        dltasks = set()
        dltasks.add(asyncio.ensure_future(self.__copy_data_thread('c->a', c_reader, a_writer)))
        dltasks.add(asyncio.ensure_future(self.__copy_data_thread('a->c', a_reader, c_writer)))
        dones, dltasks = await asyncio.wait(dltasks, return_when=asyncio.FIRST_COMPLETED)
    
    def main(local_ip, local_port, remote_ip, remote_port):
    tp = tcpproxy(remote_ip, remote_port)
    
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(tp.accept_handle, local_ip, local_port, loop=loop)
    server = loop.run_until_complete(coro)
    
    print('Serving on {}'.format(server.sockets[0].getsockname()))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    
    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
    
    if __name__ == "__main__":
    	#监听本地端口9000,数据转发到127.0.0.1:9001上。
    main('0.0.0.0', 9000, '127.0.0.1', 9001)

4.4.2、HTTP

4.4.2.1、服务端
复制代码
    from aiohttp import web
    
    async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)
    
    app = web.Application()
    app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])
    
    if __name__ == '__main__':
    web.run_app(app, port=7777)
4.4.2.2、客户端
复制代码
    import aiohttp
    import asyncio
    
    async def main():
    
    async with aiohttp.ClientSession() as session:
        async with session.get('http://127.0.0.1:7777/python_t') as response:
            print("Status:", response.status)
            print("Content-type:", response.headers['content-type'])
    
            html = await response.text()
            print("Body:", html[:15], "...")
    
    if __name__ == '__main__':
    	loop = asyncio.get_event_loop()
    	loop.run_until_complete(main())

4.4.3、WebSocket

4.4.3.1、服务端
复制代码
    import asyncio
    import websockets
    
    async def recv_msg(websocket):
    while True:
        recv_text = await websocket.recv()
        response_text = f"echo: {recv_text}"
        await websocket.send(response_text)
    
    async def main_logic(websocket, path):
    await recv_msg(websocket)
    
    if __name__ == '__main__':
    start_server = websockets.serve(main_logic, '0.0.0.0', 5678)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
4.4.3.2、客户端
复制代码
    import asyncio
    import websockets
    
    async def send_msg(websocket):
    while True:
        _text = input("please enter your context: ")
        if _text == "exit":
            print(f'you have enter "exit", goodbye')
            await websocket.close(reason="user exit")
            return False
    
        await websocket.send(_text)
        recv_text = await websocket.recv()
        print(f"{recv_text}")
    
    async def main_logic():
    async with websockets.connect('ws://127.0.0.1:5678') as websocket:
        await send_msg(websocket)
    
    if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_logic())
4.4.3.3、HTML+JS客户端
复制代码
    <!DOCTYPE html>
    <html lang="en">
    <head>
    <meta charset="UTF-8">
    <title>Websocket</title>
    </head>
    <body>
    <h1>Echo Test</h1>
    <input type="text" id="sendTxt">
    <button id="sendBtn">发送</button>
    <div id="recv"></div>
    <script>
        var websocket = new WebSocket("ws://127.0.0.1:5678/");
        
    		websocket.onopen = function() 
    		{
            console.log('websocket open');
            document.getElementById('recv').innerHTML = "Connected";
        }
        
    		websocket.onclose = function() 
    		{
            console.log('websocket close');
        }
        
    		websocket.onmessage = function(e) 
    		{
            console.log(e.data);
            document.getElementById("recv").innerHTML = e.data;
        }
        
    		document.getElementById("sendBtn").onclick = function() 
    		{
            let txt = document.getElementById("sendTxt").value;
            websocket.send(txt);
        }
    </script>
    </body>
    </html>

4.5、物联网(MQTT)

MQTT测试服务器

复制代码
    Broker: broker.emqx.io
    TCP 端口: 1883
    Websocket 端口: 8083
    TCP/TLS 端口: 8883
    Websocket/TLS 端口: 8084

MQTT网页版测试客户端
https://www.emqx.cn/mqtt/mqtt-websocket-toolkit

安装第三方库:pip install paho-mqtt

4.5.1、订阅主题

复制代码
    import paho.mqtt.client as mqtt
     
    def on_connect(client, userdata, flags, rc):
      print("Connected with result code "+str(rc))
     
    def on_message(client, userdata, msg):
      print(msg.topic+" "+str(msg.payload))
     
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("broker.emqx.io", 1883, 600)
    client.subscribe('emqtt',qos=0)
    client.loop_start()

4.5.2、发布消息

复制代码
    import paho.mqtt.client as mqtt
    
    def on_connect(client, userdata, flags, rc):
      print("Connected with result code "+str(rc))
     
    def on_message(client, userdata, msg):
      print(msg.topic+" "+str(msg.payload))
     
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect("broker.emqx.io", 1883, 600)
    client.publish('emqtt',payload='Hello,EMQ!',qos=0)
    client.loop_start()

4.6、数据库(MySQL)

复制代码
    import mysql.connector 
    mydb = mysql.connector.connect(
      host="localhost",
      user="root",
      passwd="123456",
      database="runoob_db"
    )
    mycursor = mydb.cursor()
    mycursor.execute("SELECT * FROM sites")
    myresult = mycursor.fetchall()     # fetchall() 获取所有记录
    for x in myresult:
      print(x)

全部评论 (0)

还没有任何评论哟~