" name="sm-site-verification"/>
侧边栏壁纸
博主头像
PySuper 博主等级

千里之行,始于足下

  • 累计撰写 234 篇文章
  • 累计创建 15 个标签
  • 累计收到 2 条评论

目 录CONTENT

文章目录

生产消费、观察者、发布订阅

PySuper
2024-12-14 / 0 评论 / 0 点赞 / 7 阅读 / 0 字
温馨提示:
所有牛逼的人都有一段苦逼的岁月。 但是你只要像SB一样去坚持,终将牛逼!!! ✊✊✊

区别分析

1. 生产者-消费者模式

  • 核心思想:通过一个共享的缓冲区或队列,将生产者和消费者分开解耦。生产者生产数据放入队列,消费者从队列中取出数据处理。

  • 场景:适合需要平衡生产与消费速度的场景,如日志处理、异步任务等。

2. 观察者模式

  • 核心思想:一个对象的状态发生改变时,会自动通知所有依赖于它的观察者对象。观察者与被观察者之间是多对一的依赖关系。

  • 场景:适合需要通知多个对象更新状态的场景,如GUI事件系统。

3. 发布-订阅模式

  • 核心思想:消息的发布者和订阅者通过消息中间件解耦,发布者发送消息到中间件,订阅者从中间件中获取消息。

  • 场景:适合松耦合、多对多的通信场景,如微服务架构中的消息系统。


Python 示例代码

1. 生产者-消费者模式

使用 Redis 实现生产者-消费者队列。

代码:

import threading
import time

import redis

# 连接Redis
redis_client = redis.StrictRedis(host='localhost', port=6379, decode_responses=True)
queue_name = 'task_queue'

def producer():
    for i in range(10):
        task = f"Task-{i}"
        redis_client.rpush(queue_name, task)
        print(f"[Producer] Produced: {task}")
        time.sleep(0.5)

def consumer():
    while True:
        task = redis_client.blpop(queue_name, timeout=5)
        if task:
            print(f"[Consumer] Consumed: {task[1]}")
        else:
            print("[Consumer] No more tasks, exiting...")
            break

# 启动线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join(

2. 观察者模式

实现一个简单的事件通知机制。

代码:

class Subject:
    def init(self):
        self._observers = []

    def attach(self, observer):
        self._observers.append(observer)

    def detach(self, observer):
        self._observers.remove(observer)

    def notify(self, message):
        for observer in self._observers:
            observer.update(message)


class Observer:
    def update(self, message):
        raise NotImplementedError("Subclasses must implement this method!")


class ConcreteObserverA(Observer):
    def update(self, message):
        print(f"[Observer A] Received: {message}")


class ConcreteObserverB(Observer):
    def update(self, message):
        print(f"[Observer B] Received: {message}")


# 示例
subject = Subject()
observer_a = ConcreteObserverA()
observer_b = ConcreteObserverB()
subject.attach(observer_a)
subject.attach(observer_b)
subject.notify("Event 1")
subject.notify("Event 2")

3. 发布-订阅模式

使用 Redis 的发布/订阅功能。

代码:

import threading
import time

import redis

# 连接Redis
redis_client = redis.StrictRedis(host="localhost", port=6379, decode_responses=True)
channel_name = "news_channel"


def publisher():
    messages = ["Breaking News 1", "Breaking News 2", "Breaking News 3"]
    for message in messages:
        redis_client.publish(channel_name, message)
        print(f"[Publisher] Published: {message}")


def subscriber():
    pubsub = redis_client.pubsub()
    pubsub.subscribe(channel_name)
    print("[Subscriber] Subscribed to channel...")
    for message in pubsub.listen():
        if message["type"] == "message":
            print(f"[Subscriber] Received: {message['data']}")


# 启动线程
publisher_thread = threading.Thread(target=publisher)
subscriber_thread = threading.Thread(target=subscriber)
subscriber_thread.start()
time.sleep(1)  # 确保订阅已启动
publisher_thread.start()
publisher_thread.join()

总结

  • 生产者-消费者:通过队列解耦生产与消费,适合异步任务处理。

  • 观察者模式:直接绑定对象之间的通知机制,适合状态更新场景。

  • 发布-订阅:通过中间件实现松耦合通信,适合复杂分布式系统

0

评论区