Python 12nd Day
RabbitmqHello World!
sending:
1) establish connection
#!/usr/bin/env python
import pika
# establish a connection with RabbitMQ server
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
2) create a queue to which the message will be delivered
channel.queue_declare(queue='hello')
In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
We use a default exchange identified by an empty string, it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:
3) sending
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" Sent 'Hello World!'")
4) close connection
connection.close()
Receving:
1) connect to RabbitMQ server. The code responsible for connecting to Rabbit is the same as previously
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
2) make sure that the queue exists. Creating a queue using queue_declare is idempotent (幂等) ‒ we can run the command as many times as we like, and only one will be created
channel.queue_declare(queue='hello')
You may ask why we declare the queue again ‒ we have already declared it in our previous code. We could avoid that if we were sure that the queue already exists. For example if send.py program was run before. But we're not yet sure which program to run first. In such cases it's a good practice to repeat declaring the queue in both programs (sending & receving).
3) Receiving messages from the queue works by subscribing a callback function to a queue. Whenever we receive a message, this callback function is called by the Pika library.
# deal with received messages
def callback(ch, method, properties, body):
# print on the screen the contents of the message
print(" Received %r" % body)
4) tell RabbitMQ that this particular callback function should receive messages from our hello queue
channel.basic_consume(callback,
queue='hello',
no_ack=True)
5) enter a never-ending loop that waits for data and runs callbacks whenever necessary
print('
[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
publish/subscribe
1) Exchange
There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. it just broadcasts all the messages it receives to all the queues it knows.
# create an exchange of fanout type
channel.exchange_declare(exchange='logs',
type='fanout')
2) Temporary queues
create a queue with a random name
result = channel.queue_declare()
once we disconnect the consumer the queue should be deleted. There's an exclusiveflag for that:
result = channel.queue_declare(exclusive=True)
3) Bindings
we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
channel.queue_bind(exchange='logs',
queue=result.method.queue)
4) Sending
# routing_key is ignored for fanout exchanges
channel.basic_publish(exchange='logs',
routing_key='',
body=message)
5) Receiving
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
# temp queue
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# bind q to exchange
channel.queue_bind(exchange='logs',
queue=queue_name)
print('
[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
Routing
Be able to subscribe only to a subset of the messages --> create a binding with a key
channel.queue_bind(exchange=exchange_name,
queue=queue_name,
routing_key='black') # binding key
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
fanout - only capable of mindless broadcasting
Direct exchange
a message goes to the queues whose binding key exactly matches the routing key of the message
1) sending
# creating exchange
channel.exchange_declare(exchange='direct_logs',
type='direct')
# sending using direct exchange
channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)
2) Subscribing
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
# bind q with binding key
for severity in severities:
channel.queue_bind(exchange='direct_logs',
queue=queue_name,
routing_key=severity)
Remote procedure call
1) Client interface, expose a method named call which sends an RPC request
fibonacci_rpc = FibonacciRpcClient()
result = fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
2) In order to receive a response the client needs to send a 'callback' queue address with the request
# create a temp queue to receive response
result = channel.queue_declare(exclusive=True)
callback_queue = result.method.queue
# use default exchange to send request along with callback_queue
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to = callback_queue,
),
body=request)
for each RPC request, client sends messages with two properties: reply_to & correlation_id
reply-to: set the callback queue
correlation_id: set to a unique value for every request
MySQL
一对多:
user table:
mysql> select * from user;
+----+----------+--------+
| id | name | course |
+----+----------+--------+
|2 | alice | 1 |
|3 | bob | 1 |
|4 | caroline | 2 |
|5 | david | 5 |
|6 | emma | NULL |
+----+----------+--------+
course table:
mysql> select * from course;
+----+------------+
| id | name |
+----+------------+
|1 | html5 |
|2 | css3 |
|3 | javascript |
|4 | php |
|5 | mysql |
+----+------------+
建立外键关系:
ALTER TABLE `user`
ADD CONSTRAINT `FK_course`
FOREIGN KEY (`course`) REFERENCES `course` (`id`)
ON UPDATE CASCADE;
连表:
1) INNER JOIN:
mysql> SELECT user.name, course.name
-> FROM `user`
-> INNER JOIN `course` on user.course = course.id;
+----------+-------+
| name | name|
+----------+-------+
| alice | html5 |
| bob | html5 |
| caroline | css3|
| david | mysql |
+----------+-------+
A INNER JOIN produces a set of records which match in both the user and course tables
2) LEFT JOIN:
mysql> SELECT user.name, course.name
-> FROM `user`
-> LEFT JOIN `course` on user.course = course.id;
+----------+-------+
| name | name|
+----------+-------+
| alice | html5 |
| bob | html5 |
| caroline | css3|
| david | mysql |
| emma | NULL|
+----------+-------+
A LEFT JOIN produces a set of records which matches every entry in the left table (user) regardless of any matching entry in the right table (course)
3) RIGHT JOIN:
mysql> SELECT user.name, course.name
-> FROM `user`
-> RIGHT JOIN `course` on user.course = course.id;
+----------+------------+
| name | name |
+----------+------------+
| alice | html5 |
| bob | html5 |
| caroline | css3 |
| NULL | javascript |
| NULL | php |
| david | mysql |
+----------+------------+
4) UNION (OUTER JOIN):
mysql> SELECT user.name, course.name
-> FROM `user`
-> LEFT JOIN `course` on user.course = course.id
->
-> UNION
->
-> SELECT user.name, course.name
-> FROM `user`
-> RIGHT JOIN `course` on user.course = course.id;
+----------+------------+
| name | name |
+----------+------------+
| alice | html5 |
| bob | html5 |
| caroline | css3 |
| david | mysql |
| emma | NULL |
| NULL | javascript |
| NULL | php |
+----------+------------+
the OUTER JOIN which returns all records in both tables regardless of any match. Where no match exists, the missing side will contain NULL
页:
[1]