defpost(self): parser = reqparse.RequestParser() parser.add_argument( 'message', required=True, help='message can not be blank!') args = parser.parse_args() msg = args['message'] with Database() as db, db.connect() as conn: with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur: cur.execute( f"SELECT token FROM notify") fetch = cur.fetchall() for f in fetch: body = { 'token': f"Bearer {f['token']}", 'message': f"Hello everyone, {msg}" } cli.send_message( QueueUrl=os.environ("SQS_URL"), DelaySeconds=0, MessageAttributes={}, MessageBody=json.dumps(body), ) return {'result': 'ok'}, 200
程式寫完了就是要加一條路由/notify/sqs
1 2
from controller.notify_sqs_controller import SendNotifyBySQSController api.add_resource(SendNotifyBySQSController, '/notify/sqs')
接著透過wsgi在本地起一個 server
1
sls wsgi serve
再搭配 postman 來做測試,測試內容如下
1 2 3
{ "message": "test Content" }
接著透過sls deploy部署上會遇到一個問題,會有 Access Denied,所以要在serverless.yml加入 IAM role 的設定