1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| from itemadapter import ItemAdapter import psycopg2
class IthomePipeline: def process_item(self, item, spider): return item
class PostgreSqlPipeline: def open_spider(self, spider): USER = 'postgres' PASSWORD = 'pg123456' host = 'localhost' user = USER password = PASSWORD dbname = 'ithome' conn_sting = f"host={host} user={user} dbname={dbname} password={password}"
self.conn = psycopg2.connect(conn_sting) print('資料庫連線成功!') self.cursor = self.conn.cursor()
def close_spider(self, spider): self.cursor.close() self.conn.close()
def process_item(self, item, spider): article = { 'title': item.get('title'), 'url': item.get('url'), 'author': item.get('author'), 'publish_time': item.get('publish_time'), 'tags': item.get('tags'), 'content': item.get('content'), 'view_count': item.get('view_count') }
self.cursor.execute(""" INSERT INTO articles(title, url, author, publish_time, tags, content, view_count) VALUES (%(title)s, %(url)s, %(author)s, %(publish_time)s, %(tags)s, %(content)s, %(view_count)s) ON CONFLICT(url) DO UPDATE SET title=%(title)s, tags=%(tags)s, content=%(content)s, update_time=current_timestamp RETURNING id; """, article) self.conn.commit() article_id = self.cursor.fetchone()[0]
article_responses = item.get('responses') for article_response in article_responses: response = { 'id': article_response['resp_id'], 'article_id': article_id, 'author': article_response['author'], 'publish_time': article_response['publish_time'], 'content': article_response['content'], }
self.cursor.execute(""" INSERT INTO public.response(id, article_id, author, publish_time, content) VALUES (%(id)s, %(article_id)s ,%(author)s, %(publish_time)s, %(content)s) ON CONFLICT(id) DO UPDATE SET content=%(content)s; """, response) self.conn.commit()
return item
|