Random Thoughts

✨ Suy nghĩ vu vơ

Nodejs eventloop hoạt động như thế nào, Libuv, epoll


Một ngày cuối năm đẹp trời, tôi bị đứa bạn thân ai nấy lo lâu năm hỏi một câu, mày biết tại sao cái thằng Nodejs, Redis là Single-Thread nhưng mà sao nó vẫn chạy nhanh như thế không. Thú thật là mình không biết, vào đọc mấy cái medium thì cũng k hiểu gì. Thôi thì tự code một cái event-loop , cũng là để hiểu event-loop nó hoạt động như thế nào.

Một số khái niệm

Nào cùng nhau tra cứu một số khái niệm, để hiểu được bài viết này, cảnh báo là nhiều chữ và nhiều code nên các bạn cứ thảnh thơi ra làm ấm trà, điếu thuốc rồi vào đọc cho nó thư thả.

File Descriptors / File Descriptor Table

Trong linux có một câu nói khá nổi tiếng Everything is a file, File ở đây có thể là.

  • File (Đương nhiên rồi)
  • Terminal I/O (stdin/stdout/stderr)
  • pipe
  • sockets
  • device

Khi một tiến trình được khởi chạy thì mặc định sẽ được truy cập đến 3 tài nguyên

  • stdin
  • stdout
  • stderr

Các tài nguyên này được lưu trong bảng gọi là File Descriptor Table (FDTable) với chỉ số (index) là File Descriptor(FD)

FD Pointer
1 stdin pointer
2 stdout pointer
3 stderr pointer

Nếu tiến trình này mở một file mới, thì file mới sẽ được add vào FDTable, Tương tự khi tiến trình này mở 1 connection, một pipe, tất cả các tài nguyên này, đều là file, và được lưu ở FDTable

FD Pointer
1 stdin pointer
2 stdout pointer
3 stderr pointer
4 file pointer

Timer / Callback

Cùng nói qua một chút về timercallback trong javascipt. Chúng ta cùng xem xét đoạn code sau

setTimeout(function cb() {
    console.log("callback")
}, 5000)

thì nodejs sẽ khởi tạo 1 timer, sau khi timerđó kết thúc, thì sẽ đẩy callback vào task queue

Libuv / Eventloop

Nhắc đến Event Loop trong javascript thì chắc chẳng ai còn lạ gì nữa, nếu thấy lạ thì mời bạn xem video rất nổi tiếng sau đây:

Libuv là gì, nó là thư viện để xử lý các vấn đề liên quan đến bất đồng bộ. Libuv là nền tảng của event-loop trong Nodejs. Video ở trên đã giải thích được nodejs tương tác với stack , task queue, event loop như thế nào. Bài viết này sẽ giải thích cách mà event loop hoạt đột.

    libuv is a multi-platform support library with a focus on asynchronous I/O. 
    It was primarily developed for use by Node.js, but it's also used by Luvit, Julia, pyuv, and others.

'Node JS System'

Để dễ hiểu hơn về cơ chế hoạt động của event-loop. hãy bắt đầu thử viết một event loop đơn giản, xử lý kết nối qua socket. Tôi sẽ đưa ra 2 ví dụ về 2 cách viết.

Blocking Socket Server

blocking_socket.pydownload
import socket

EOL = b'\n'
response = b'Hello world'
HOST = '127.0.0.1'  # The server's hostname or IP address
PORT = 65432        # The port used by the server

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server:
    server.bind((HOST, PORT))
    server.listen()

    while True:
        conn, address = server.accept()
        with conn:
            req = b''
            while EOL not in req:
                req += conn.recv(1024)
                print('-' * 50 + '\n')
                print(req.decode())
                print('-' * 50 + '\n')

            conn.send(response)

Nói đến lập trình socketsocket thì ví dụ trên là một chương trình socket điển hình. - Khởi tạo một socket server, lắng nghe ở port 654321. - Tạo một vòng lặp vô tận, chờ một kết nối đến - nhận dữ liệu từ client cho đến khi có kí tự EOL trong nội dung. - Đóng kết nối và gửi lại client nội dung Hello world - Tiếp tục một vòng lặp mới

Nhưng vấn đề ở chương trình này là gì, đó là nó bị blocking ở câu lệnh sau

    conn, address = server.accept()

Tại dòng lệnh này thì trình dịch python sẽ dừng chương trình lại, không xử lý gì cả, chờ đợt cho đến khi có một connection mới. Thuật ngữ thường được gọi là blocking IO. Khi có dữ liệu mới từ client, chương trình tiếp tục xử lý và sau đó quay lại chu kì lặp và tiếp tục chờ đợi. Dẫn đến chương trình này chỉ làm việc được với tối đa 1 client trong 1 thời điểm, những client sau đó phải chờ cho đến khi client trước đó hoàn thành phiên làm việc mới được xử lý.

Các bạn có thể xem demo chường trình này dưới đây, tôi cùng một lúc khởi tạo 2 client với id là 12 đến socket server. Mỗi client hoạt động theo logic như sau: - Khởi tạo kết nối đến server - Sau một khoảng thời gian nhất định, gửi 1 xâu có giá trị hello from {client_id} đến server - Sau 10 lần gửi thông điệp client sẽ gửi EOL đến server

Các bạn có thể thấy, server xử lý tuần tự 1 client trong 1 thời điểm, sau khi hoàn thành xử lý với client 1, thì server mới tiếp tục làm việc với client 2

asciicast

client.pydownload
import socket
import time
from typing import SupportsInt


import argparse
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument("client_id", type=int, action="store")
parser.add_argument("inteval", type=int, action="store")
args = parser.parse_args()
inteval: int = args.inteval
client_id : int = args.client_id



HOST = '127.0.0.1'  # The server's hostname or IP address
PORT = 65432        # The port used by the server


with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:

    s.connect((HOST, PORT))
    for _ in range(10) :
        time.sleep(inteval)
        print(b'hello from %d' % client_id)
        s.sendall(b'hello from %d: ' % client_id)
    s.sendall(b'Hello, world\n')
    data = s.recv(1024)

print('Received', repr(data))

Để giải quyết bài toán này thì mọi người thường nghĩ đến một giải pháp là multithread, đây cũng là giải pháp thường được các thầy giáo hướng dẫn ở trong trường đại học. Mỗi khi có một kết nối đến server thì chương trình sẽ khởi tạo 1 thread mới, xử lý data được gửi đến từ client và trả lại dữ liệu cho client.

Nhược điểm của phương pháp này nó là, mỗi thread sẽ có call stack riêng, và việc chuyển đổi giữa các call stack cũng ảnh hưởng tới hiệu năng của chương trình. Một cách khác để giải quyết vấn đề này đó chính là non-blocking IO, nói một cách khác, chúng ta sẽ không bắt chương trình chờ cho đến khi có data nữa.

Non-Blocking Socket Server

'Epoll'

Để giải quyết bài toán trên mà không sử dụng đến multithread, chúng ta cần sử dụng một system callepoll. epoll là 1 câu lệnh của hệ điều hành linux (system call), đưa cho epoll một hoặc nhiều file descriptors, epoll sẽ trả về cho chương trình những file nào có thể đọc được.

Quay lại về bài toán lập trình socket. Để sử dụng epoll thì chúng ta sẽ thay đổi logic như sau:

  • Khởi tạo một socket, và epoll
  • Đăng kí socket file descriptor cùng sự kiện EPOLLIN vào trong epoll
  • Tại một vòng lặp vô tận:
    • kiểm tra xem epoll có event mới nào không
    • nếu có sự kiện mới thì xử lý sự kiện đó, và tiếp tục lặp
    • nếu không thì tiếp tục quay lại vòng lặp

'Epoll Flow'

lang:pythonnon_blocking_socket.py[Lines 67-94]download
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server, epoll_context(server.fileno(), select.EPOLLIN) as epoll:
    server.bind((HOST, PORT))
    server.setblocking(False)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
    server.listen(5)

    print("Listening")
    connections : Dict[int, socket.socket] = {}


    requests : Dict[int, bytes]= {}
    responses: Dict[int, bytes] = {}
    server_fd = server.fileno()

    while True:
        events = epoll.poll(0)
        print("waiting..")
        for fileno, event in events:
            if fileno == server_fd:
                init_connection(server, connections, requests, responses, epoll)
            elif event & select.EPOLLIN:
                receive_request(fileno, connections, requests, responses, epoll)
            elif event & select.EPOLLOUT:
                send_response(fileno, connections, responses, epoll)

Đối với mỗi loại event thì server sẽ xử lý bằng những hàm tương ứng, chúng ta cùng xem kỹ hơn cách server xử lý từng loại sự kiện.

Có kết nối mới từ client

Khi kiểm tra file descriptor của sự kiện mới là socket server file descriptor chúng ta hiểu được rằng là đã có một kết nối đến server.

  • Bởi vì kết nối cũng là 1 file, nên chúng ta sẽ đăng kí fd của kết nối này vào trong epoll
  • Mỗi khi có dữ liệu mới đến từ kết nối này, epoll sẽ tạo event mới cho chúng ta
lang:python :hideall:non_blocking_socket.py[Lines 26-38]download
def init_connection(server: socket.socket, connections: Dict[int, socket.socket], requests : Dict[int, bytes], responses : Dict[int, bytes], epoll: select.epoll):

    conn: socket.socket
    conn, addr = server.accept()
    conn.setblocking(False)
    print(f"New Connection: {addr}")

    fd = conn.fileno()

    epoll.register(fd, select.EPOLLIN)
    connections[fd] = conn
    requests[fd] = b''
    responses[fd] = b''

Có dữ liệu từ kết nối:

  • Đọc dữ liệu từ kết nối
  • Nếu kết nối bị ngắt, xóa fd tương ứng khỏi epoll
  • Nếu có EOL trong dữ liệu thì set event cho fd trở thành EPOLLOUT, tương ứng với việc thông báo cho chương trình là đã đọc hết dữ liệu từ client này, hãy hồi đáp về cho client
lang:python :hideall:non_blocking_socket.py[Lines 40-58]download
def receive_request(fileno: int,  connections: Dict[int, socket.socket], requests : Dict[int, bytes], responses : Dict[int, bytes], epoll: select.epoll):

    requests[fileno] += connections[fileno].recv(1024)

    print(f"new updated {fileno} {requests[fileno]}")

    if requests[fileno] == QUIT or requests[fileno] == EMPTY:
        print('[{:02d}] exit or hung up'.format(fileno))
        epoll.unregister(fileno)
        connections[fileno].close()
        del connections[fileno], requests[fileno], responses[fileno]

    elif EOL in requests[fileno]:
        epoll.modify(fileno, select.EPOLLOUT)
        msg = requests[fileno][:-1]
        print("[{:02d}] says: {}".format(fileno, msg))
        responses[fileno] = b'ACK\n'
        requests[fileno] = b''

Có tín hiệu hồi đáp cho client.

  • Gửi dữ liệu cho client
  • Đổi loại event cho kết nối thành EPOLLIN, để server tiếp lắng nghe dữ liệu mới trên kết nối này
lang:python :hideall:non_blocking_socket.py[Lines 59-64]download
def send_response(fileno: int,  connections: Dict[int, socket.socket],  responses : Dict[int, bytes], epoll: select.epoll):
    """Send a response to a client."""
    byteswritten = connections[fileno].send(responses[fileno])
    responses[fileno] = responses[fileno][byteswritten:]
    epoll.modify(fileno, select.EPOLLIN)

asciicast

LibUV

'Epoll'

Nói một cách đơn giản, Libuv chỉ là 1 vòng lặp.

  • Đầu tiên chương trình sẽ kiểm tra vòng lặp này có đang hoạt động hay không? Chương trình có handler nào hay không, có kết nối nào hay không
  • Thực thi những timer đã hết thời gian chờ. Khi một timer hết hạn, thì callback của timer đó sẽ được đẩy vào queue
  • Thực thi các callback đang được pending trong task queue.
  • Tính toán thời gian chờ (timeout) khi polling
  • Chờ IO trong khoảng thời gian timeout
  • Kiểm tra callback handler đã được thực thi hay chưa.
  • Thực thi close callbacks