4
4
import irc3
5
5
import datetime
6
6
from irc3 .compat import asyncio
7
- from concurrent .futures import ThreadPoolExecutor
8
7
9
8
__doc__ = '''
10
9
==========================================
22
21
irc3.plugins.feeds
23
22
24
23
[irc3.plugins.feeds]
25
- channels = #irc3 # global channel to notify
26
- delay = 5 # delay to check feeds
27
- directory = ~/.irc3/feeds # directory to store feeds
24
+ channels = #irc3 # global channel to notify
25
+ delay = 5 # delay to check feeds in minutes
26
+ directory = ~/.irc3/feeds # directory to store feeds
28
27
hook = irc3.plugins.feeds.default_hook # dotted name to a callable
29
28
fmt = [{name}] {entry.title} - {entry.link} # formatter
30
29
34
33
github/irc3.fmt = [{feed.name}] New commit: {entry.title} - {entry.link}
35
34
# custom channels
36
35
github/irc3.channels = #irc3dev #irc3
37
- # custom delay
36
+ # custom delay in minutes
38
37
github/irc3.delay = 10
39
38
40
39
Hook is a dotted name refering to a callable (function or class) wich take a
64
63
'''
65
64
66
65
HEADERS = {
67
- 'User-Agent' : 'python-requests /irc3/feeds' ,
66
+ 'User-Agent' : 'python-aiohttp /irc3/feeds' ,
68
67
'Cache-Control' : 'max-age=0' ,
69
68
'Pragma' : 'no-cache' ,
70
69
}
@@ -82,21 +81,6 @@ def dispatcher(messages):
82
81
return dispatcher
83
82
84
83
85
- def fetch (args ):
86
- """fetch a feed"""
87
- requests = args ['requests' ]
88
- for feed , filename in zip (args ['feeds' ], args ['filenames' ]):
89
- try :
90
- resp = requests .get (feed , timeout = 5 , headers = HEADERS )
91
- content = resp .content
92
- except Exception : # pragma: no cover
93
- pass
94
- else :
95
- with open (filename , 'wb' ) as fd :
96
- fd .write (content )
97
- return args ['name' ]
98
-
99
-
100
84
ISO_FORMAT = "%Y-%m-%dT%H:%M:%S"
101
85
102
86
@@ -108,7 +92,7 @@ def parse(feedparser, args):
108
92
109
93
for filename in args ['filenames' ]:
110
94
try :
111
- with open (filename + '.updated' ) as fd :
95
+ with open (filename + '.updated' , encoding = "UTF-8" ) as fd :
112
96
updated = datetime .datetime .strptime (
113
97
fd .read ()[:len ("YYYY-MM-DDTHH:MM:SS" )], ISO_FORMAT
114
98
)
@@ -146,8 +130,6 @@ def parse(feedparser, args):
146
130
class Feeds :
147
131
"""Feeds plugin"""
148
132
149
- PoolExecutor = ThreadPoolExecutor
150
-
151
133
def __init__ (self , bot ):
152
134
bot .feeds = self
153
135
self .bot = bot
@@ -207,7 +189,16 @@ def __init__(self, bot):
207
189
208
190
def connection_made (self ):
209
191
"""Initialize checkings"""
210
- self .bot .loop .call_later (10 , self .update )
192
+ self .bot .create_task (self .periodically_update ())
193
+
194
+ async def periodically_update (self ):
195
+ """After a connection has been made, call update feeds periodically."""
196
+ if not self .aiohttp or not self .feedparser :
197
+ return
198
+ await asyncio .sleep (10 )
199
+ while True :
200
+ await self .update ()
201
+ await asyncio .sleep (self .delay )
211
202
212
203
def imports (self ):
213
204
"""show some warnings if needed"""
@@ -218,14 +209,14 @@ def imports(self):
218
209
self .bot .log .critical ('feedparser is not installed' )
219
210
self .feedparser = None
220
211
try :
221
- import requests
212
+ import aiohttp
222
213
except ImportError : # pragma: no cover
223
- self .bot .log .critical ('requests is not installed' )
224
- self .requests = None
214
+ self .bot .log .critical ('aiohttp is not installed' )
215
+ self .aiohttp = None
225
216
else :
226
- self .requests = requests
217
+ self .aiohttp = aiohttp
227
218
228
- def parse (self , * args ):
219
+ def parse (self ):
229
220
"""parse pre-fetched feeds and notify new entries"""
230
221
entries = []
231
222
for feed in self .feeds .values ():
@@ -237,33 +228,37 @@ def messages():
237
228
if entry :
238
229
feed = entry .feed
239
230
message = feed ['fmt' ].format (feed = feed , entry = entry )
240
- for c in feed ['channels' ]:
241
- yield c , message
231
+ for channel in feed ['channels' ]:
232
+ yield channel , message
242
233
243
234
self .dispatcher (messages ())
244
235
245
- def update_time (self , future ):
246
- name = future .result ()
247
- self .bot .log .debug ('Feed %s fetched' , name )
248
- feed = self .feeds [name ]
249
- feed ['time' ] = time .time ()
250
-
251
- def update (self ):
236
+ async def update (self ):
252
237
"""update feeds"""
253
- loop = self .bot .loop
254
- loop .call_later (self .delay , self .update )
255
-
256
238
now = time .time ()
257
- feeds = [dict (f , requests = self .requests ) for f in self .feeds .values ()
258
- if f ['time' ] < now - f ['delay' ]]
259
- if feeds :
260
- self .bot .log .info ('Fetching feeds %s' ,
261
- ', ' .join ([f ['name' ] for f in feeds ]))
262
- tasks = []
263
- for feed in feeds :
264
- task = loop .run_in_executor (None , fetch , feed )
265
- task .add_done_callback (self .update_time )
266
- tasks .append (task )
267
- task = self .bot .create_task (
268
- asyncio .wait (tasks , timeout = len (feeds ) * 2 , loop = loop ))
269
- task .add_done_callback (self .parse )
239
+ feeds = [feed for feed in self .feeds .values ()
240
+ if feed ['time' ] < now - feed ['delay' ]]
241
+ if not feeds :
242
+ return
243
+ self .bot .log .info ('Fetching feeds %s' ,
244
+ ', ' .join ([f ['name' ] for f in feeds ]))
245
+ timeout = self .aiohttp .ClientTimeout (total = 5 )
246
+ async with self .aiohttp .ClientSession (timeout = timeout ) as session :
247
+ await asyncio .gather (
248
+ * [self .fetch (feed , session ) for feed in feeds ]
249
+ )
250
+ self .parse ()
251
+
252
+ async def fetch (self , feed , session ):
253
+ """fetch a feed"""
254
+ for url , filename in zip (feed ['feeds' ], feed ['filenames' ]):
255
+ try :
256
+ async with session .get (url , headers = HEADERS ) as resp :
257
+ with open (filename , 'wb' ) as file :
258
+ file .write (await resp .read ())
259
+ except Exception : # pragma: no cover
260
+ self .bot .log .exception (
261
+ "Exception while fetching feed %s" , feed ['name' ]
262
+ )
263
+ self .bot .log .debug ('Feed %s fetched' , feed ['name' ])
264
+ feed ['time' ] = time .time ()
0 commit comments